import pyangbind.lib.pybindJSON as pbJ
import sys
import os
+
# Bindings must present in ${WORKSPACE}
-workspace = os.environ['WORKSPACE'] + '/odl-lispflowmapping-yang-files'
+workspace = os.environ["WORKSPACE"] + "/odl-lispflowmapping-yang-files"
"""Helper Functions """
objB: eid object of pyangbind generated class
"""
for name in dir(objB):
- if name[:4] == '_eid':
+ if name[:4] == "_eid":
value = getattr(objB, name)
try:
setattr(objA, name, value)
objB: rloc object of pyangbind generated class
"""
for name in dir(objB):
- if name[:5] == '_rloc':
+ if name[:5] == "_rloc":
value = getattr(objB, name)
try:
setattr(objA, name, value)
"""
new_obj = {}
for key, value in obj.items():
- if key == 'hop':
+ if key == "hop":
for hop in value:
- values = hop['hop-id'].split(' ')
- hop['hop-id'] = values[0] + " " + values[1]
- if values[2] != '':
- hop['lrs-bits'] = ' '.join(values[2:])[:-1]
+ values = hop["hop-id"].split(" ")
+ hop["hop-id"] = values[0] + " " + values[1]
+ if values[2] != "":
+ hop["lrs-bits"] = " ".join(values[2:])[:-1]
new_obj[key] = value
if isinstance(value, dict):
new_obj[key] = clean_hops(value)
"""
if laddr_obj is None:
sys.path.insert(0, workspace)
- from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import input
+ from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import (
+ input,
+ )
+
rpc_input = input()
laddr_obj = rpc_input.mapping_record.eid
if vni:
laddr_obj.virtual_network_id = vni
- eid_string = eid_string.split(':')
- prefix, text = eid_string[0], ':'.join(eid_string[1:])
+ eid_string = eid_string.split(":")
+ prefix, text = eid_string[0], ":".join(eid_string[1:])
if prefix:
- if prefix == 'srcdst':
+ if prefix == "srcdst":
# Example: srcdst:192.0.2.1/32|192.0.2.2/32
- laddr_obj.address_type = 'laddr:source-dest-key-lcaf'
- text = text.split('|')
+ laddr_obj.address_type = "laddr:source-dest-key-lcaf"
+ text = text.split("|")
laddr_obj.source_dest_key.source = text[0]
laddr_obj.source_dest_key.dest = text[1]
- elif prefix == 'no':
+ elif prefix == "no":
# Example: no:
- laddr_obj.address_type = 'laddr:no-address-afi'
- elif prefix == 'ipv4':
- if '/' in text:
+ laddr_obj.address_type = "laddr:no-address-afi"
+ elif prefix == "ipv4":
+ if "/" in text:
# Case: ipv4-prefix
- laddr_obj.address_type = 'laddr:ipv4-prefix-afi'
+ laddr_obj.address_type = "laddr:ipv4-prefix-afi"
laddr_obj.ipv4_prefix = text
else:
# Case: ipv4
- laddr_obj.address_type = 'laddr:ipv4-afi'
+ laddr_obj.address_type = "laddr:ipv4-afi"
laddr_obj.ipv4 = text
- elif prefix == 'ipv6':
- if '/' in text:
+ elif prefix == "ipv6":
+ if "/" in text:
# Case: ipv6-prefix
- laddr_obj.address_type = 'laddr:ipv6-prefix-afi'
+ laddr_obj.address_type = "laddr:ipv6-prefix-afi"
laddr_obj.ipv6_prefix = text
else:
- laddr_obj.address_type = 'laddr:ipv6-afi'
+ laddr_obj.address_type = "laddr:ipv6-afi"
laddr_obj.ipv6 = text
- elif prefix == 'mac':
+ elif prefix == "mac":
# Example: mac:00:00:5E:00:53:00
- laddr_obj.address_type = 'laddr:mac-afi'
+ laddr_obj.address_type = "laddr:mac-afi"
laddr_obj.mac = text
- elif prefix == 'dn':
+ elif prefix == "dn":
# Example: dn:stringAsIs
- laddr_obj.address_type = 'laddr:distinguished-name-afi'
+ laddr_obj.address_type = "laddr:distinguished-name-afi"
laddr_obj.distinguished_name = text
- elif prefix == 'as':
+ elif prefix == "as":
# Example: as:AS64500
- laddr_obj.address_type = 'laddr:as-number-afi'
+ laddr_obj.address_type = "laddr:as-number-afi"
laddr_obj.as_number = text
- elif prefix == 'list':
+ elif prefix == "list":
# Example: list:{192.0.2.1,192.0.2.2,2001:db8::1}
- laddr_obj.address_type = 'laddr:afi-list-lcaf'
- list_elements = text[1:len(text) - 1].split(',') # removed start and end braces
+ laddr_obj.address_type = "laddr:afi-list-lcaf"
+ list_elements = text[1 : len(text) - 1].split(
+ ","
+ ) # removed start and end braces
laddr_obj.afi_list.address_list = list_elements
- elif prefix == 'appdata':
+ elif prefix == "appdata":
# Example: appdata:192.0.2.1!128!17!80-81!6667-7000
- laddr_obj.address_type = 'laddr:application-data-lcaf'
- text = text.split('!')
+ laddr_obj.address_type = "laddr:application-data-lcaf"
+ text = text.split("!")
laddr_obj.application_data.address = text[0]
laddr_obj.application_data.ip_tos = text[1]
laddr_obj.application_data.protocol = text[2]
- local_ports = text[3].split('-')
+ local_ports = text[3].split("-")
laddr_obj.application_data.local_port_low = local_ports[0]
laddr_obj.application_data.local_port_high = local_ports[1]
- remote_ports = text[4].split('-')
+ remote_ports = text[4].split("-")
laddr_obj.application_data.remote_port_low = remote_ports[0]
laddr_obj.application_data.remote_port_high = remote_ports[1]
- elif prefix == 'elp':
+ elif prefix == "elp":
# TODO: BITS_TYPE_for_lps
# Example: elp:{192.0.2.1->192.0.2.2|lps->192.0.2.3}
- laddr_obj.address_type = 'laddr:explicit-locator-path-lcaf'
- text = text[1:len(text) - 1]
- text = text.split('->') # all the hops
+ laddr_obj.address_type = "laddr:explicit-locator-path-lcaf"
+ text = text[1 : len(text) - 1]
+ text = text.split("->") # all the hops
for i in range(0, len(text)):
- cur_hop = text[i].split('|')
+ cur_hop = text[i].split("|")
address = cur_hop[0]
lrs_bits = ""
hop_id = "Hop " + str(i + 1) + " " + lrs_bits
lrs_bits += "strict "
laddr_obj.explicit_locator_path.hop.add(hop_id)
laddr_obj.explicit_locator_path.hop[hop_id].address = address
- elif prefix == 'kv':
+ elif prefix == "kv":
# Example: kv:192.0.2.1->192.0.2.2
- laddr_obj.address_type = 'laddr:key-value-address-lcaf'
- text = text.split('->')
+ laddr_obj.address_type = "laddr:key-value-address-lcaf"
+ text = text.split("->")
laddr_obj.key_value_address.key = text[0]
laddr_obj.key_value_address.value = text[1]
- elif prefix == 'sp':
+ elif prefix == "sp":
# Example: sp:42(3)
- laddr_obj.address_type = 'laddr:service-path-lcaf'
- text = text.split('(')
+ laddr_obj.address_type = "laddr:service-path-lcaf"
+ text = text.split("(")
laddr_obj.service_path.service_path_id = text[0]
laddr_obj.service_path.service_index = text[1][:-1]
eid_string: type of lisp address
vni: virtual network id
"""
- pbj_dump = pbJ.dumps(Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf")
- out_dump = '{"eid":' + pbj_dump + '}'
+ pbj_dump = pbJ.dumps(
+ Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf"
+ )
+ out_dump = '{"eid":' + pbj_dump + "}"
return Clean_JSON(out_dump)
eid_string: type of lisp address
vni: virtual network id
"""
- out_dump = pbJ.dumps(Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf")
+ out_dump = pbJ.dumps(
+ Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf"
+ )
return Clean_JSON(out_dump)
return Wrap_input(Get_LispAddress_JSON(eid_string, vni))
-def Get_LocatorRecord_Object(rloc, weights='1/1/255/0', flags=0o01, loc_id="ISP1"):
+def Get_LocatorRecord_Object(rloc, weights="1/1/255/0", flags=0o01, loc_id="ISP1"):
""" Description: Returns locator record object from pyangbind generated classes
Returns: locator record object
Params:
loc_id: id of locator record object
"""
sys.path.insert(0, workspace)
- from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import input
+ from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import (
+ input,
+ )
+
rpc_input = input()
lrecord_obj = rpc_input.mapping_record.LocatorRecord
# TODO: What should be the locator-id
lrecord_obj.add(loc_id)
- lrecord_ele = weights.split('/')
+ lrecord_ele = weights.split("/")
lrecord_obj[loc_id].priority = lrecord_ele[0]
lrecord_obj[loc_id].weight = lrecord_ele[1]
lrecord_obj[loc_id].multicastPriority = lrecord_ele[2]
return lrecord_obj
-def Get_LocatorRecord_JSON(rloc, weights='1/1/255/0', flags=0o01, loc_id="ISP1"):
+def Get_LocatorRecord_JSON(rloc, weights="1/1/255/0", flags=0o01, loc_id="ISP1"):
""" Description: Returns locator record dictionary
Returns: python dictionary
Params:
flags: Three bit parameter in the sequence routed->rlocProbed->routed
loc_id: id of locator record object
"""
- pbj_dump = pbJ.dumps(Get_LocatorRecord_Object(rloc, weights, flags, loc_id), filter=True, mode="default")
+ pbj_dump = pbJ.dumps(
+ Get_LocatorRecord_Object(rloc, weights, flags, loc_id),
+ filter=True,
+ mode="default",
+ )
pbj_dict = json.loads(pbj_dump)
- pbj_dict[loc_id]['rloc'] = Get_LispAddress_Noeid_JSON(rloc)
- out_dump = '{"LocatorRecord":' + str(pbj_dict) + '}'
+ pbj_dict[loc_id]["rloc"] = Get_LispAddress_Noeid_JSON(rloc)
+ out_dump = '{"LocatorRecord":' + str(pbj_dict) + "}"
return Clean_JSON(out_dump)
-def Get_MappingRecord_Object(eid, locators, ttl=1440, authoritative=True, action='NoAction'):
+def Get_MappingRecord_Object(
+ eid, locators, ttl=1440, authoritative=True, action="NoAction"
+):
""" Description: Returns mapping record object from pyangbind generated classes.
Returns: mapping record object
Params:
action: action
"""
sys.path.insert(0, workspace)
- from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import input
+ from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import (
+ input,
+ )
+
rpc_input = input()
mrecord_obj = rpc_input.mapping_record
mrecord_obj.recordTtl = ttl
return mrecord_obj
-def Get_MappingRecord_JSON(eid, locators, ttl=1440, authoritative=True, action='NoAction'):
+def Get_MappingRecord_JSON(
+ eid, locators, ttl=1440, authoritative=True, action="NoAction"
+):
""" Description: Returns mapping record dictionary
Returns: python dictionary
Params:
authoritative: authoritative
action: action
"""
- pbj_dump = pbJ.dumps(Get_MappingRecord_Object(eid, locators, ttl, authoritative, action), filter=True, mode="ietf")
- out_dump = '{"mapping-record":' + pbj_dump + '}'
+ pbj_dump = pbJ.dumps(
+ Get_MappingRecord_Object(eid, locators, ttl, authoritative, action),
+ filter=True,
+ mode="ietf",
+ )
+ out_dump = '{"mapping-record":' + pbj_dump + "}"
return Clean_JSON(out_dump)
key_type: key type
"""
sys.path.insert(0, workspace)
- from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_key.input import input as add_key_input
+ from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_key.input import (
+ input as add_key_input,
+ )
+
rpc_input = add_key_input()
authkey_obj = rpc_input.mapping_authkey
authkey_obj.key_string = key_string
key_string: key string
key_type: key type
"""
- pbj_dump = pbJ.dumps(Get_MappingAuthkey_Object(key_string, key_type), filter=True, mode="default")
- out_dump = '{"mapping-authkey":' + pbj_dump + '}'
+ pbj_dump = pbJ.dumps(
+ Get_MappingAuthkey_Object(key_string, key_type), filter=True, mode="default"
+ )
+ out_dump = '{"mapping-authkey":' + pbj_dump + "}"
return Clean_JSON(out_dump)