X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=csit%2Flibraries%2FJsonGenerator.py;h=bb1f9bd6e7209e6912c72f9b7dfcb54b3cfbaf84;hb=1c106006ffb33c1c60e2f8abe61e88a7bd95a7bd;hp=94caad67bcdf70cf4f6f16be91e98f5a5ccf36d0;hpb=e235045c2ad12d41416e2583609a0178c8bd9287;p=integration%2Ftest.git diff --git a/csit/libraries/JsonGenerator.py b/csit/libraries/JsonGenerator.py index 94caad67bc..bb1f9bd6e7 100644 --- a/csit/libraries/JsonGenerator.py +++ b/csit/libraries/JsonGenerator.py @@ -2,8 +2,9 @@ import json import pyangbind.lib.pybindJSON as pbJ import sys import os + # Bindings must present in ${WORKSPACE} -workspace = os.environ['WORKSPACE'] + '/odl-lispflowmapping-yang-files' +workspace = os.environ["WORKSPACE"] + "/odl-lispflowmapping-yang-files" """Helper Functions """ @@ -59,7 +60,7 @@ def copy_eid(objA, objB): objB: eid object of pyangbind generated class """ for name in dir(objB): - if name[:4] == '_eid': + if name[:4] == "_eid": value = getattr(objB, name) try: setattr(objA, name, value) @@ -75,7 +76,7 @@ def copy_rloc(objA, objB): objB: rloc object of pyangbind generated class """ for name in dir(objB): - if name[:5] == '_rloc': + if name[:5] == "_rloc": value = getattr(objB, name) try: setattr(objA, name, value) @@ -91,12 +92,12 @@ def clean_hops(obj): """ new_obj = {} for key, value in obj.items(): - if key == 'hop': + if key == "hop": for hop in value: - values = hop['hop-id'].split(' ') - hop['hop-id'] = values[0] + " " + values[1] - if values[2] != '': - hop['lrs-bits'] = ' '.join(values[2:])[:-1] + values = hop["hop-id"].split(" ") + hop["hop-id"] = values[0] + " " + values[1] + if values[2] != "": + hop["lrs-bits"] = " ".join(values[2:])[:-1] new_obj[key] = value if isinstance(value, dict): new_obj[key] = clean_hops(value) @@ -126,80 +127,85 @@ def Get_LispAddress_Object(eid_string, vni=None, laddr_obj=None): """ if laddr_obj is None: sys.path.insert(0, workspace) - from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import input + from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import ( + input, + ) + rpc_input = input() laddr_obj = rpc_input.mapping_record.eid if vni: laddr_obj.virtual_network_id = vni - eid_string = eid_string.split(':') - prefix, text = eid_string[0], ':'.join(eid_string[1:]) + eid_string = eid_string.split(":") + prefix, text = eid_string[0], ":".join(eid_string[1:]) if prefix: - if prefix == 'srcdst': + if prefix == "srcdst": # Example: srcdst:192.0.2.1/32|192.0.2.2/32 - laddr_obj.address_type = 'laddr:source-dest-key-lcaf' - text = text.split('|') + laddr_obj.address_type = "laddr:source-dest-key-lcaf" + text = text.split("|") laddr_obj.source_dest_key.source = text[0] laddr_obj.source_dest_key.dest = text[1] - elif prefix == 'no': + elif prefix == "no": # Example: no: - laddr_obj.address_type = 'laddr:no-address-afi' - elif prefix == 'ipv4': - if '/' in text: + laddr_obj.address_type = "laddr:no-address-afi" + elif prefix == "ipv4": + if "/" in text: # Case: ipv4-prefix - laddr_obj.address_type = 'laddr:ipv4-prefix-afi' + laddr_obj.address_type = "laddr:ipv4-prefix-afi" laddr_obj.ipv4_prefix = text else: # Case: ipv4 - laddr_obj.address_type = 'laddr:ipv4-afi' + laddr_obj.address_type = "laddr:ipv4-afi" laddr_obj.ipv4 = text - elif prefix == 'ipv6': - if '/' in text: + elif prefix == "ipv6": + if "/" in text: # Case: ipv6-prefix - laddr_obj.address_type = 'laddr:ipv6-prefix-afi' + laddr_obj.address_type = "laddr:ipv6-prefix-afi" laddr_obj.ipv6_prefix = text else: - laddr_obj.address_type = 'laddr:ipv6-afi' + laddr_obj.address_type = "laddr:ipv6-afi" laddr_obj.ipv6 = text - elif prefix == 'mac': + elif prefix == "mac": # Example: mac:00:00:5E:00:53:00 - laddr_obj.address_type = 'laddr:mac-afi' + laddr_obj.address_type = "laddr:mac-afi" laddr_obj.mac = text - elif prefix == 'dn': + elif prefix == "dn": # Example: dn:stringAsIs - laddr_obj.address_type = 'laddr:distinguished-name-afi' + laddr_obj.address_type = "laddr:distinguished-name-afi" laddr_obj.distinguished_name = text - elif prefix == 'as': + elif prefix == "as": # Example: as:AS64500 - laddr_obj.address_type = 'laddr:as-number-afi' + laddr_obj.address_type = "laddr:as-number-afi" laddr_obj.as_number = text - elif prefix == 'list': + elif prefix == "list": # Example: list:{192.0.2.1,192.0.2.2,2001:db8::1} - laddr_obj.address_type = 'laddr:afi-list-lcaf' - list_elements = text[1:len(text) - 1].split(',') # removed start and end braces + laddr_obj.address_type = "laddr:afi-list-lcaf" + list_elements = text[1 : len(text) - 1].split( + "," + ) # removed start and end braces laddr_obj.afi_list.address_list = list_elements - elif prefix == 'appdata': + elif prefix == "appdata": # Example: appdata:192.0.2.1!128!17!80-81!6667-7000 - laddr_obj.address_type = 'laddr:application-data-lcaf' - text = text.split('!') + laddr_obj.address_type = "laddr:application-data-lcaf" + text = text.split("!") laddr_obj.application_data.address = text[0] laddr_obj.application_data.ip_tos = text[1] laddr_obj.application_data.protocol = text[2] - local_ports = text[3].split('-') + local_ports = text[3].split("-") laddr_obj.application_data.local_port_low = local_ports[0] laddr_obj.application_data.local_port_high = local_ports[1] - remote_ports = text[4].split('-') + remote_ports = text[4].split("-") laddr_obj.application_data.remote_port_low = remote_ports[0] laddr_obj.application_data.remote_port_high = remote_ports[1] - elif prefix == 'elp': + elif prefix == "elp": # TODO: BITS_TYPE_for_lps # Example: elp:{192.0.2.1->192.0.2.2|lps->192.0.2.3} - laddr_obj.address_type = 'laddr:explicit-locator-path-lcaf' - text = text[1:len(text) - 1] - text = text.split('->') # all the hops + laddr_obj.address_type = "laddr:explicit-locator-path-lcaf" + text = text[1 : len(text) - 1] + text = text.split("->") # all the hops for i in range(0, len(text)): - cur_hop = text[i].split('|') + cur_hop = text[i].split("|") address = cur_hop[0] lrs_bits = "" hop_id = "Hop " + str(i + 1) + " " + lrs_bits @@ -213,16 +219,16 @@ def Get_LispAddress_Object(eid_string, vni=None, laddr_obj=None): lrs_bits += "strict " laddr_obj.explicit_locator_path.hop.add(hop_id) laddr_obj.explicit_locator_path.hop[hop_id].address = address - elif prefix == 'kv': + elif prefix == "kv": # Example: kv:192.0.2.1->192.0.2.2 - laddr_obj.address_type = 'laddr:key-value-address-lcaf' - text = text.split('->') + laddr_obj.address_type = "laddr:key-value-address-lcaf" + text = text.split("->") laddr_obj.key_value_address.key = text[0] laddr_obj.key_value_address.value = text[1] - elif prefix == 'sp': + elif prefix == "sp": # Example: sp:42(3) - laddr_obj.address_type = 'laddr:service-path-lcaf' - text = text.split('(') + laddr_obj.address_type = "laddr:service-path-lcaf" + text = text.split("(") laddr_obj.service_path.service_path_id = text[0] laddr_obj.service_path.service_index = text[1][:-1] @@ -236,8 +242,10 @@ def Get_LispAddress_JSON(eid_string, vni=None): eid_string: type of lisp address vni: virtual network id """ - pbj_dump = pbJ.dumps(Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf") - out_dump = '{"eid":' + pbj_dump + '}' + pbj_dump = pbJ.dumps( + Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf" + ) + out_dump = '{"eid":' + pbj_dump + "}" return Clean_JSON(out_dump) @@ -248,7 +256,9 @@ def Get_LispAddress_Noeid_JSON(eid_string, vni=None): eid_string: type of lisp address vni: virtual network id """ - out_dump = pbJ.dumps(Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf") + out_dump = pbJ.dumps( + Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf" + ) return Clean_JSON(out_dump) @@ -262,7 +272,7 @@ def Get_LispAddress_JSON_And_Wrap_input(eid_string, vni=None): return Wrap_input(Get_LispAddress_JSON(eid_string, vni)) -def Get_LocatorRecord_Object(rloc, weights='1/1/255/0', flags=0o01, loc_id="ISP1"): +def Get_LocatorRecord_Object(rloc, weights="1/1/255/0", flags=0o01, loc_id="ISP1"): """ Description: Returns locator record object from pyangbind generated classes Returns: locator record object Params: @@ -272,12 +282,15 @@ def Get_LocatorRecord_Object(rloc, weights='1/1/255/0', flags=0o01, loc_id="ISP1 loc_id: id of locator record object """ sys.path.insert(0, workspace) - from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import input + from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import ( + input, + ) + rpc_input = input() lrecord_obj = rpc_input.mapping_record.LocatorRecord # TODO: What should be the locator-id lrecord_obj.add(loc_id) - lrecord_ele = weights.split('/') + lrecord_ele = weights.split("/") lrecord_obj[loc_id].priority = lrecord_ele[0] lrecord_obj[loc_id].weight = lrecord_ele[1] lrecord_obj[loc_id].multicastPriority = lrecord_ele[2] @@ -290,7 +303,7 @@ def Get_LocatorRecord_Object(rloc, weights='1/1/255/0', flags=0o01, loc_id="ISP1 return lrecord_obj -def Get_LocatorRecord_JSON(rloc, weights='1/1/255/0', flags=0o01, loc_id="ISP1"): +def Get_LocatorRecord_JSON(rloc, weights="1/1/255/0", flags=0o01, loc_id="ISP1"): """ Description: Returns locator record dictionary Returns: python dictionary Params: @@ -299,14 +312,20 @@ def Get_LocatorRecord_JSON(rloc, weights='1/1/255/0', flags=0o01, loc_id="ISP1") flags: Three bit parameter in the sequence routed->rlocProbed->routed loc_id: id of locator record object """ - pbj_dump = pbJ.dumps(Get_LocatorRecord_Object(rloc, weights, flags, loc_id), filter=True, mode="default") + pbj_dump = pbJ.dumps( + Get_LocatorRecord_Object(rloc, weights, flags, loc_id), + filter=True, + mode="default", + ) pbj_dict = json.loads(pbj_dump) - pbj_dict[loc_id]['rloc'] = Get_LispAddress_Noeid_JSON(rloc) - out_dump = '{"LocatorRecord":' + str(pbj_dict) + '}' + pbj_dict[loc_id]["rloc"] = Get_LispAddress_Noeid_JSON(rloc) + out_dump = '{"LocatorRecord":' + str(pbj_dict) + "}" return Clean_JSON(out_dump) -def Get_MappingRecord_Object(eid, locators, ttl=1440, authoritative=True, action='NoAction'): +def Get_MappingRecord_Object( + eid, locators, ttl=1440, authoritative=True, action="NoAction" +): """ Description: Returns mapping record object from pyangbind generated classes. Returns: mapping record object Params: @@ -317,7 +336,10 @@ def Get_MappingRecord_Object(eid, locators, ttl=1440, authoritative=True, action action: action """ sys.path.insert(0, workspace) - from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import input + from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import ( + input, + ) + rpc_input = input() mrecord_obj = rpc_input.mapping_record mrecord_obj.recordTtl = ttl @@ -347,7 +369,9 @@ def Get_MappingRecord_Object(eid, locators, ttl=1440, authoritative=True, action return mrecord_obj -def Get_MappingRecord_JSON(eid, locators, ttl=1440, authoritative=True, action='NoAction'): +def Get_MappingRecord_JSON( + eid, locators, ttl=1440, authoritative=True, action="NoAction" +): """ Description: Returns mapping record dictionary Returns: python dictionary Params: @@ -357,8 +381,12 @@ def Get_MappingRecord_JSON(eid, locators, ttl=1440, authoritative=True, action=' authoritative: authoritative action: action """ - pbj_dump = pbJ.dumps(Get_MappingRecord_Object(eid, locators, ttl, authoritative, action), filter=True, mode="ietf") - out_dump = '{"mapping-record":' + pbj_dump + '}' + pbj_dump = pbJ.dumps( + Get_MappingRecord_Object(eid, locators, ttl, authoritative, action), + filter=True, + mode="ietf", + ) + out_dump = '{"mapping-record":' + pbj_dump + "}" return Clean_JSON(out_dump) @@ -370,7 +398,10 @@ def Get_MappingAuthkey_Object(key_string="password", key_type=1): key_type: key type """ sys.path.insert(0, workspace) - from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_key.input import input as add_key_input + from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_key.input import ( + input as add_key_input, + ) + rpc_input = add_key_input() authkey_obj = rpc_input.mapping_authkey authkey_obj.key_string = key_string @@ -385,6 +416,8 @@ def Get_MappingAuthkey_JSON(key_string="password", key_type=1): key_string: key string key_type: key type """ - pbj_dump = pbJ.dumps(Get_MappingAuthkey_Object(key_string, key_type), filter=True, mode="default") - out_dump = '{"mapping-authkey":' + pbj_dump + '}' + pbj_dump = pbJ.dumps( + Get_MappingAuthkey_Object(key_string, key_type), filter=True, mode="default" + ) + out_dump = '{"mapping-authkey":' + pbj_dump + "}" return Clean_JSON(out_dump)