Use RFC8040 URL for DAEXIM tests
[integration/test.git] / csit / libraries / JsonGenerator.py
1 import json
2 import pyangbind.lib.pybindJSON as pbJ
3 import sys
4 import os
5
6 # Bindings must present in ${WORKSPACE}
7 workspace = os.environ["WORKSPACE"] + "/odl-lispflowmapping-yang-files"
8
9 """Helper Functions """
10
11
12 def Clean_JSON(string_dump):
13     """Description: clean the pyangbind generated object
14     Return: python dictionary
15     Params:
16      string_dump: string representation of pyangbind generated dictionary
17     """
18     string_dump = string_dump.replace("odl-mappingservice:", "")
19     string_dump = string_dump.replace("laddr:", "ietf-lisp-address-types:")
20     dict_obj = clean_hops(json.loads(string_dump))
21     return dict_obj
22
23
24 def Merge(first, second):
25     """Description: merge two python dictionaries
26     Return: python dictionary
27     Params:
28      first: python dictionary
29      second: python dictionary
30     """
31     first.update(second)
32     return first
33
34
35 def Wrap_input(dict_obj):
36     """Description: Wrap input to python dictionary
37     Return: python dictionary
38     Params:
39      dict_obj: python dictionary
40     """
41     out_dump = {"input": dict_obj}
42     return out_dump
43
44
45 def Merge_And_Wrap_input(first, second):
46     """Description: Merge two python dictionaries and wrap input
47     Return: python dictionary
48     Params:
49      first: python dictionary
50      second: python dictionary
51     """
52     return Wrap_input(Merge(first, second))
53
54
55 def copy_eid(objA, objB):
56     """Description: Copy value of attributes from one eid object to other
57     Return: None
58     Params:
59      objA: eid object of pyangbind generated class
60      objB: eid object of pyangbind generated class
61     """
62     for name in dir(objB):
63         if name[:4] == "_eid":
64             value = getattr(objB, name)
65             try:
66                 setattr(objA, name, value)
67             except AttributeError:
68                 print("%s giving attribute error in %s" % (name, objA))
69
70
71 def copy_rloc(objA, objB):
72     """Description: Copy value of attributes from one rloc object to other
73     Returns: None
74     Params:
75      objA: rloc object of pyangbind generated class
76      objB: rloc object of pyangbind generated class
77     """
78     for name in dir(objB):
79         if name[:5] == "_rloc":
80             value = getattr(objB, name)
81             try:
82                 setattr(objA, name, value)
83             except AttributeError:
84                 print(" %s giving attribute error in" % (name, objA))
85
86
87 def clean_hops(obj):
88     """Description: Clean hop-ids and lrs-bits
89     Returns: python dictionary
90     Params:
91      obj: python dictionary for pyangbind generated object
92     """
93     new_obj = {}
94     for key, value in obj.items():
95         if key == "hop":
96             for hop in value:
97                 values = hop["hop-id"].split(" ")
98                 hop["hop-id"] = values[0] + " " + values[1]
99                 if values[2] != "":
100                     hop["lrs-bits"] = " ".join(values[2:])[:-1]
101                 new_obj[key] = value
102         if isinstance(value, dict):
103             new_obj[key] = clean_hops(value)
104         elif isinstance(value, list):
105             if len(value) > 0 and isinstance(value[0], dict):
106                 cur_items = []
107                 for items in value:
108                     cur_items.append(clean_hops(items))
109                 new_obj[key] = cur_items
110             else:
111                 new_obj[key] = value
112         else:
113             new_obj[key] = value
114     return new_obj
115
116
117 """Generator Functions"""
118
119
120 def Get_LispAddress_Object(eid_string, vni=None, laddr_obj=None):
121     """Description: Returns lisp address object from pyangbind generated classes.
122     Returns: lisp address object
123     Params:
124      eid_string: type of lisp address
125      vni: virtual network id
126      laddr_obj: lisp address object
127     """
128     if laddr_obj is None:
129         sys.path.insert(0, workspace)
130         from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import (
131             input,
132         )
133
134         rpc_input = input()
135         laddr_obj = rpc_input.mapping_record.eid
136
137     if vni:
138         laddr_obj.virtual_network_id = vni
139
140     eid_string = eid_string.split(":")
141     prefix, text = eid_string[0], ":".join(eid_string[1:])
142     if prefix:
143         if prefix == "srcdst":
144             # Example: srcdst:192.0.2.1/32|192.0.2.2/32
145             laddr_obj.address_type = "laddr:source-dest-key-lcaf"
146             text = text.split("|")
147             laddr_obj.source_dest_key.source = text[0]
148             laddr_obj.source_dest_key.dest = text[1]
149         elif prefix == "no":
150             # Example: no:
151             laddr_obj.address_type = "laddr:no-address-afi"
152         elif prefix == "ipv4":
153             if "/" in text:
154                 # Case: ipv4-prefix
155                 laddr_obj.address_type = "laddr:ipv4-prefix-afi"
156                 laddr_obj.ipv4_prefix = text
157             else:
158                 # Case: ipv4
159                 laddr_obj.address_type = "laddr:ipv4-afi"
160                 laddr_obj.ipv4 = text
161         elif prefix == "ipv6":
162             if "/" in text:
163                 # Case: ipv6-prefix
164                 laddr_obj.address_type = "laddr:ipv6-prefix-afi"
165                 laddr_obj.ipv6_prefix = text
166             else:
167                 laddr_obj.address_type = "laddr:ipv6-afi"
168                 laddr_obj.ipv6 = text
169         elif prefix == "mac":
170             # Example: mac:00:00:5E:00:53:00
171             laddr_obj.address_type = "laddr:mac-afi"
172             laddr_obj.mac = text
173         elif prefix == "dn":
174             # Example: dn:stringAsIs
175             laddr_obj.address_type = "laddr:distinguished-name-afi"
176             laddr_obj.distinguished_name = text
177         elif prefix == "as":
178             # Example: as:AS64500
179             laddr_obj.address_type = "laddr:as-number-afi"
180             laddr_obj.as_number = text
181         elif prefix == "list":
182             # Example: list:{192.0.2.1,192.0.2.2,2001:db8::1}
183             laddr_obj.address_type = "laddr:afi-list-lcaf"
184             list_elements = text[1 : len(text) - 1].split(
185                 ","
186             )  # removed start and end braces
187             laddr_obj.afi_list.address_list = list_elements
188         elif prefix == "appdata":
189             # Example: appdata:192.0.2.1!128!17!80-81!6667-7000
190             laddr_obj.address_type = "laddr:application-data-lcaf"
191             text = text.split("!")
192             laddr_obj.application_data.address = text[0]
193             laddr_obj.application_data.ip_tos = text[1]
194             laddr_obj.application_data.protocol = text[2]
195             local_ports = text[3].split("-")
196             laddr_obj.application_data.local_port_low = local_ports[0]
197             laddr_obj.application_data.local_port_high = local_ports[1]
198             remote_ports = text[4].split("-")
199             laddr_obj.application_data.remote_port_low = remote_ports[0]
200             laddr_obj.application_data.remote_port_high = remote_ports[1]
201         elif prefix == "elp":
202             # TODO: BITS_TYPE_for_lps
203             # Example: elp:{192.0.2.1->192.0.2.2|lps->192.0.2.3}
204             laddr_obj.address_type = "laddr:explicit-locator-path-lcaf"
205             text = text[1 : len(text) - 1]
206             text = text.split("->")  # all the hops
207             for i in range(0, len(text)):
208                 cur_hop = text[i].split("|")
209                 address = cur_hop[0]
210                 lrs_bits = ""
211                 hop_id = "Hop " + str(i + 1) + " " + lrs_bits
212                 if len(cur_hop) > 1:
213                     lps = cur_hop[1]
214                     if "l" in lps:
215                         lrs_bits += "lookup "
216                     if "p" in lps:
217                         lrs_bits += "rloc-probe "
218                     if "s" in lps:
219                         lrs_bits += "strict "
220                 laddr_obj.explicit_locator_path.hop.add(hop_id)
221                 laddr_obj.explicit_locator_path.hop[hop_id].address = address
222         elif prefix == "kv":
223             # Example: kv:192.0.2.1->192.0.2.2
224             laddr_obj.address_type = "laddr:key-value-address-lcaf"
225             text = text.split("->")
226             laddr_obj.key_value_address.key = text[0]
227             laddr_obj.key_value_address.value = text[1]
228         elif prefix == "sp":
229             # Example: sp:42(3)
230             laddr_obj.address_type = "laddr:service-path-lcaf"
231             text = text.split("(")
232             laddr_obj.service_path.service_path_id = text[0]
233             laddr_obj.service_path.service_index = text[1][:-1]
234
235     return laddr_obj
236
237
238 def Get_LispAddress_JSON(eid_string, vni=None):
239     """Description: Returns lisp address dictionary with eid wrapped
240     Returns: python dictionary
241     Params:
242      eid_string: type of lisp address
243      vni: virtual network id
244     """
245     pbj_dump = pbJ.dumps(
246         Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf"
247     )
248     out_dump = '{"eid":' + pbj_dump + "}"
249     return Clean_JSON(out_dump)
250
251
252 def Get_LispAddress_Noeid_JSON(eid_string, vni=None):
253     """Description: Returns lisp address dictionary
254     Returns: python dictionary
255     Params:
256      eid_string: type of lisp address
257      vni: virtual network id
258     """
259     out_dump = pbJ.dumps(
260         Get_LispAddress_Object(eid_string, vni), filter=True, mode="ietf"
261     )
262     return Clean_JSON(out_dump)
263
264
265 def Get_LispAddress_JSON_And_Wrap_input(eid_string, vni=None):
266     """Description: Returns lisp address dictionary with eid and input wrapped
267     Returns: python dictionary
268     Params:
269      eid_string: type of lisp address
270      vni: virtual network id
271     """
272     return Wrap_input(Get_LispAddress_JSON(eid_string, vni))
273
274
275 def Get_LocatorRecord_Object(rloc, weights="1/1/255/0", flags=0o01, loc_id="ISP1"):
276     """Description: Returns locator record object from pyangbind generated classes
277     Returns: locator record object
278     Params:
279      rloc: eid_string for lisp address object
280      weights: priority/weight/multicastPriority/multicastWeight
281      flags: Three bit parameter in the sequence routed->rlocProbed->routed
282      loc_id: id of locator record object
283     """
284     sys.path.insert(0, workspace)
285     from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import (
286         input,
287     )
288
289     rpc_input = input()
290     lrecord_obj = rpc_input.mapping_record.LocatorRecord
291     # TODO: What should be the locator-id
292     lrecord_obj.add(loc_id)
293     lrecord_ele = weights.split("/")
294     lrecord_obj[loc_id].priority = lrecord_ele[0]
295     lrecord_obj[loc_id].weight = lrecord_ele[1]
296     lrecord_obj[loc_id].multicastPriority = lrecord_ele[2]
297     lrecord_obj[loc_id].multicastWeight = lrecord_ele[3]
298     laddr_obj = lrecord_obj[loc_id].rloc
299     laddr_obj = Get_LispAddress_Object(rloc, laddr_obj=laddr_obj)
300     lrecord_obj[loc_id].localLocator = flags % 10
301     lrecord_obj[loc_id].rlocProbed = (flags / 10) % 10
302     lrecord_obj[loc_id].routed = (flags / 100) % 10
303     return lrecord_obj
304
305
306 def Get_LocatorRecord_JSON(rloc, weights="1/1/255/0", flags=0o01, loc_id="ISP1"):
307     """Description: Returns locator record dictionary
308     Returns: python dictionary
309     Params:
310      rloc: eid_string for lisp address object
311      weights: priority/weight/multicastPriority/multicastWeight
312      flags: Three bit parameter in the sequence routed->rlocProbed->routed
313      loc_id: id of locator record object
314     """
315     pbj_dump = pbJ.dumps(
316         Get_LocatorRecord_Object(rloc, weights, flags, loc_id),
317         filter=True,
318         mode="default",
319     )
320     pbj_dict = json.loads(pbj_dump)
321     pbj_dict[loc_id]["rloc"] = Get_LispAddress_Noeid_JSON(rloc)
322     out_dump = '{"LocatorRecord":' + str(pbj_dict) + "}"
323     return Clean_JSON(out_dump)
324
325
326 def Get_MappingRecord_Object(
327     eid, locators, ttl=1440, authoritative=True, action="NoAction"
328 ):
329     """Description: Returns mapping record object from pyangbind generated classes.
330     Returns: mapping record object
331     Params:
332      eid: lisp address object
333      locators: list of locator record objects
334      ttl: recordTtl
335      authoritative: authoritative
336      action: action
337     """
338     sys.path.insert(0, workspace)
339     from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_mapping.input import (
340         input,
341     )
342
343     rpc_input = input()
344     mrecord_obj = rpc_input.mapping_record
345     mrecord_obj.recordTtl = ttl
346     mrecord_obj.authoritative = authoritative
347     mrecord_obj.action = action
348     copy_eid(mrecord_obj.eid, eid)
349     idx = 0
350     loc_ids = []
351     for loc in locators:
352         loc_id = loc.keys()[0]
353         loc_obj = loc[loc_id]
354         if loc_id in loc_ids:
355             print("Locator objects should have different keys")
356             break
357         # TODO: Locator-id, currently in the format of loc_id0, loc_id1
358         mrecord_obj.LocatorRecord.add(loc_id)
359         mrecord_loc_obj = mrecord_obj.LocatorRecord[loc_id]
360         mrecord_loc_obj.priority = loc_obj.priority
361         mrecord_loc_obj.weight = loc_obj.weight
362         mrecord_loc_obj.multicastPriority = loc_obj.multicastPriority
363         mrecord_loc_obj.multicastWeight = loc_obj.multicastWeight
364         copy_rloc(mrecord_loc_obj.rloc, loc_obj.rloc)
365         mrecord_loc_obj.localLocator = loc_obj.localLocator
366         mrecord_loc_obj.rlocProbed = loc_obj.rlocProbed
367         mrecord_loc_obj.routed = loc_obj.routed
368         idx += 1
369     return mrecord_obj
370
371
372 def Get_MappingRecord_JSON(
373     eid, locators, ttl=1440, authoritative=True, action="NoAction"
374 ):
375     """Description: Returns mapping record dictionary
376     Returns: python dictionary
377     Params:
378      eid: lisp address object
379      locators: list of locator record objects
380      ttl: recordTtl
381      authoritative: authoritative
382      action: action
383     """
384     pbj_dump = pbJ.dumps(
385         Get_MappingRecord_Object(eid, locators, ttl, authoritative, action),
386         filter=True,
387         mode="ietf",
388     )
389     out_dump = '{"mapping-record":' + pbj_dump + "}"
390     return Clean_JSON(out_dump)
391
392
393 def Get_MappingAuthkey_Object(key_string="password", key_type=1):
394     """Description: Returns mapping auth key object from pyangbind generated classes.
395     Returns: mapping auth key object
396     Params:
397      key_string: key string
398      key_type: key type
399     """
400     sys.path.insert(0, workspace)
401     from LISPFlowMappingYANGBindings.odl_mappingservice_rpc.add_key.input import (
402         input as add_key_input,
403     )
404
405     rpc_input = add_key_input()
406     authkey_obj = rpc_input.mapping_authkey
407     authkey_obj.key_string = key_string
408     authkey_obj.key_type = key_type
409     return authkey_obj
410
411
412 def Get_MappingAuthkey_JSON(key_string="password", key_type=1):
413     """Description: Returns mapping auth key dictionary
414     Returns: python dictionary
415     Params:
416      key_string: key string
417      key_type: key type
418     """
419     pbj_dump = pbJ.dumps(
420         Get_MappingAuthkey_Object(key_string, key_type), filter=True, mode="default"
421     )
422     out_dump = '{"mapping-authkey":' + pbj_dump + "}"
423     return Clean_JSON(out_dump)