Bug-2226: RFC5886 yang model
[bgpcep.git] / pcep / pcepy / message / tlv.py
1 # PCEP TLV definitions
2
3 # Copyright (c) 2012,2013 Cisco Systems, Inc. and others.  All rights reserved.
4 #
5 # This program and the accompanying materials are made available under the
6 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 # and is available at http://www.eclipse.org/legal/epl-v10.html
8
9 from . import data
10 from . import base
11 from . import code
12
13 # RFC 5440 PCEP
14 class NoPathVector(base.Tlv):
15     type_id = 1
16     unavailable = data.Flag(offset=31)
17     destination = data.Flag(offset=30)
18     source = data.Flag(offset=29)
19     pks_failure = data.Flag(offset=27) # RFC 5520 PK
20     p2mp_reachability = data.Flag(offset=24) # RFC 6006 P2MP
21
22
23 # RFC 5440 PCEP
24 class OverloadedDuration(base.Tlv):
25     type_id = 2
26     duration = data.Int(offset=0, size=32)
27
28
29 # RFC 5440 PCEP
30 class RequestMissing(base.Tlv):
31     type_id = 3
32     rp_id = data.Int(offset=0, size=32)
33
34 # RFC 5541 OF
35 class OfList(base.Tlv):
36     type_id = 4
37     _fixed = False
38     __of_code_sup = 1 << 16
39
40     def __init__(self, clone=None, **updates):
41         self._of_codes = list()
42         super(OfList, self).__init__(clone=clone)
43         if clone is None:
44             self._header.length = 0
45         else:
46             self._of_codes.extend(clone.of_codes)
47         if updates:
48             self.update(updates)
49
50     @property
51     def of_codes(self):
52         return self._of_codes
53
54     @of_codes.setter
55     def of_codes(self, of_codes):
56         if isinstance(of_codes, (list, tuple)):
57             self._of_codes.extend(of_codes)
58         else:
59             self._of_codes.append(of_codes)
60
61     def _get_size(self):
62         size = 2 * len(self._of_codes)
63         self._header.length = size
64         return self._header.size + data.padlen(size, 4)
65
66     def update(self, updates):
67         updated = super(OfList, self).update(updates)
68         if 'of_codes' in updates:
69             self.of_codes = updates['of_codes']
70             updated += 1
71         return updated
72
73     def read(self, buf, off, max_end):
74         off = super(OfList, self).read(buf, off, max_end)
75         end = off + data.padlen(self._header.length, 4)
76         if self._header.length % 2:
77             _errmsg = ('OfListTlv length (%s) is not even'
78                 % self._header.length
79             )
80             raise data.SizeError(_errmsg)
81         if end > max_end:
82             _errmsg = ('OfListTlv length (%s) exceeds limit [%s:%s]'
83                 % (self._header.length, off, max_end)
84             )
85             raise data.SizeError(_errmsg)
86         of_codes = self._of_codes
87         list_end = off + self._header.length
88         while off < list_end:
89             of_codes.append(buf[off] << 8 | buf[off+1])
90             off += 2
91         return end
92
93     def write(self, buf, off):
94         off = super(OfList, self).write(buf, off)
95         end = off + data.padlen(2 * len(self._of_codes), 4)
96         for of_code in self._of_codes:
97             if of_code >= OfList.__of_code_sup:
98                 base._LOGGER.error('OF Code (%d) cannot fit 16 bits' % of_code)
99             buf[off+1] = of_code & 0xFF
100             of_code >>= 8
101             buf[off] = of_code & 0xFF
102             off += 2
103         return end
104
105
106 # D STATEFUL
107 class StatefulPcepCapability(base.Tlv):
108     type_id = 16
109     updating = data.Flag(offset=31)
110     include_db_version = data.Flag(offset=30)
111     instantiation = data.Flag(offset=29)
112
113
114 # D STATEFUL
115 class LspSymbolicName(base.Tlv):
116     type_id = 17
117     _fixed = False
118
119     def __init__(self, clone=None, **updates):
120         self._lsp_name = b''
121         self._lsp_name_length = 0
122         super(LspSymbolicName, self).__init__(clone=clone)
123         if clone is None:
124             self._header.length = 0
125         else:
126             self.lsp_name = clone.lsp_name
127         if updates:
128             self.update(updates)
129
130     @property
131     def lsp_name(self):
132         if self._lsp_name_length < len(self._lsp_name):
133             return self._lsp_name[:self._lsp_name_length]
134         return self._lsp_name
135
136     @property
137     def padded_lsp_name(self):
138         return self._lsp_name
139
140     @lsp_name.setter
141     def lsp_name(self, lsp_name):
142         lsp_name = bytes(lsp_name)
143         self._lsp_name_length = len(lsp_name)
144         self._header.length = self._lsp_name_length
145         self._lsp_name = data.padded(lsp_name, 4)
146         self._check_lsp_name()
147
148     def _get_size(self):
149         return self._header.size + len(self._lsp_name)
150
151     def update(self, updates):
152         updated = super(LspSymbolicName, self).update(updates)
153         if 'lsp_name' in updates:
154             self.lsp_name = updates['lsp_name']
155             updated += 1
156         return updated
157
158     def read(self, buf, off, max_end):
159         off = super(LspSymbolicName, self).read(buf, off, max_end)
160         end = off + data.padlen(self._header.length, 4)
161         if end > max_end:
162             _errmsg = ('LspSymbolicNameTlv length (%s) exceeds limit [%s:%s]'
163                 % (self._header.length, off, max_end)
164             )
165             raise data.SizeError(_errmsg)
166         self._lsp_name = buf[off:end]
167         self._lsp_name_length = self._header.length
168         base._LOGGER.debug('Reading lsp_name from <%s>[%s:%s] = "%s" + "%s"'
169             % (id(buf), off, end,
170                 self._lsp_name[:self._lsp_name_length],
171                 data.to_hex(self._lsp_name[self._lsp_name_length:]),
172             )
173         )
174         self._check_lsp_name()
175         return end
176
177     def write(self, buf, off):
178         self._check_lsp_name()
179         off = super(LspSymbolicName, self).write(buf, off)
180         end = off + len(self._lsp_name)
181         buf[off:end] = self._lsp_name
182         return end
183
184     def _check_lsp_name(self):
185         fails = list()
186         nlen = self._lsp_name_length
187         plen = len(self._lsp_name)
188         if nlen <= 0 or nlen > plen:
189             fails.append('length %d outside range(%d)' % (nlen, plen))
190             nlen = min(max(nlen, 0), plen)
191         if not bytes(self._lsp_name[0:nlen]).replace(b'_', b'').isalnum():
192             fails.append('invalid character in name')
193         if bytes(self._lsp_name[nlen:plen]).replace(b'\0', b''):
194             fails.append('padding contains set bits')
195         if fails:
196             base._LOGGER.warning('LSP name "%s" check: %s'
197                 % (data.to_hex(self._lsp_name), '; '.join(fails))
198             )
199         return fails
200
201     def __str__(self):
202         return '%slsp_name="%s"' % (
203             super(LspSymbolicName, self).__str__(),
204             self.lsp_name
205         )
206
207
208 # D STATEFUL
209 class Ipv4LspIdentifiers(base.Tlv):
210     type_id = 18
211     sender = data.Ipv4(offset=0)
212     lsp_id = data.Int(offset=32, size=16)
213     tunnel_id = data.Int(offset=32+16, size=16)
214     extended_tunnel_id = data.Int(offset=32+32, size=32)
215
216 # D STATEFUL
217 class Ipv6LspIdentifiers(base.Tlv):
218     type_id = 19
219     sender = data.Ipv6(offset=0)
220     lsp_id = data.Int(offset=128, size=16)
221     tunnel_id = data.Int(offset=128+16, size=16)
222     extended_tunnel_id = data.Int(offset=128+32, size=32)
223
224 LspIdentifiers = Ipv4LspIdentifiers, Ipv6LspIdentifiers
225
226
227 # D STATEFUL
228 class LspUpdateErrorCode(base.Tlv):
229     type_id = 20
230     lsp_update_error_code = data.Int(offset=0, size=32)
231
232     @property
233     def code(self):
234         return code.LspUpdateError.from_code(self.lsp_update_error_code)
235
236     @code.setter
237     def code(self, code):
238         self.lsp_update_error_code = code.lsp_update_error_code
239
240     def __str__(self):
241         return str(self.code)
242
243
244 # D STATEFUL
245 class Ipv4RsvpErrorSpec(base.Tlv):
246     type_id = 21
247     error_node = data.Ipv4(offset=0)
248     in_place = data.Flag(offset=32+7)
249     not_guilty = data.Flag(offset=32+6)
250     error_code = data.Int(offset=32+8, size=8)
251     error_value = data.Int(offset=32+16, size=16)
252
253 # D STATEFUL
254 class Ipv6RsvpErrorSpec(base.Tlv):
255     type_id = 22
256     error_node = data.Ipv6(offset=0)
257     in_place = data.Flag(offset=128+7)
258     not_guilty = data.Flag(offset=128+6)
259     error_code = data.Int(offset=128+8, size=8)
260     error_value = data.Int(offset=128+16, size=16)
261
262 RsvpErrorSpec = Ipv4RsvpErrorSpec, Ipv6RsvpErrorSpec
263
264
265 # D STATEFUL
266 class LspStateDbVersion(base.Tlv):
267     type_id = 23
268     db_version = data.Int(offset=0, size=64)
269
270 # D STATEFUL
271 class NodeIdentifier(base.Tlv):
272     type_id = 24
273     _fixed = False
274
275     def __init__(self, clone=None, **updates):
276         self._node_id = b''
277         self._node_id_length = 0
278         super(NodeIdentifier, self).__init__(clone=clone)
279         if clone is None:
280             self._header.length = 0
281         else:
282             self.node_id = clone.node_id
283         if updates:
284             self.update(updates)
285
286     @property
287     def node_id(self):
288         if self._node_id_length < len(self._node_id):
289             return self._node_id[:self._node_id_length]
290         return self._node_id
291
292     @property
293     def padded_node_id(self):
294         return self._node_id
295
296     @node_id.setter
297     def node_id(self, node_id):
298         node_id = bytes(node_id)
299         self._node_id_length = len(node_id)
300         self._header.length = self._node_id_length
301         self._node_id = data.padded(node_id, 4)
302
303     def _get_size(self):
304         return self._header.size + len(self._node_id)
305
306     def update(self, updates):
307         updated = super(NodeIdentifier, self).update(updates)
308         if 'node_id' in updates:
309             self.node_id = updates['node_id']
310             updated += 1
311         return updated
312
313     def read(self, buf, off, max_end):
314         off = super(NodeIdentifier, self).read(buf, off, max_end)
315         end = off + data.padlen(self._header.length, 4)
316         if end > max_end:
317             base._LOGGER.error("Tlv value length (%s) exceeds limit [%s:%s]"
318                 % (self._header.length, off, max_end)
319             )
320             end = max_end
321         self._node_id = buf[off:end]
322         self._node_id_length = self._header.length
323         base._LOGGER.debug('Reading node_id from <%s>[%s:%s] = "%s"'
324             % (id(buf), off, end, data.to_hex(self._node_id))
325         )
326         return end
327
328     def write(self, buf, off):
329         base._LOGGER.warn("Writing NodeIdentifier [%s]" % self._header)
330         off = super(NodeIdentifier, self).write(buf, off)
331         end = off + len(self._node_id)
332         buf[off:end] = self._node_id
333         return end
334
335     def __str__(self):
336         return '%snode_id="%s"' % (
337             super(NodeIdentifier, self).__str__(),
338             data.to_hex(self._node_id)
339         )
340
341
342 class Unknown(base.Tlv):
343     """TLV with unknown type and arbitrary value."""
344     _fixed = False
345
346     def __init__(self, clone=None, **updates):
347         self._octets = b''
348         self._octets_length = 0
349         super(Unknown, self).__init__(clone=clone)
350         if clone is None:
351             self._header.length = 0
352         else:
353             self.octets = clone.octets
354         if updates:
355             self.update(updates)
356
357     @property
358     def octets(self):
359         if self._octets_length < len(self._octets):
360             return self._octets[:self._octets_length]
361         return self._octets
362
363     @property
364     def padded_octets(self):
365         return self._octets
366
367     @octets.setter
368     def octets(self, octets):
369         octets = bytes(octets)
370         self._octets_length = len(octets)
371         self._header.length = self._octets_length
372         self._octets = data.padded(octets, 4)
373
374     def _get_size(self):
375         return self._header.size + len(self._octets)
376
377     def update(self, updates):
378         updated = super(Unknown, self).update(updates)
379         if 'octets' in updates:
380             self.octets = updates['octets']
381             updated += 1
382         return updated
383
384     def read(self, buf, off, max_end):
385         off = super(Unknown, self).read(buf, off, max_end)
386         end = off + data.padlen(self._header.length, 4)
387         if end > max_end:
388             base._LOGGER.error("Tlv value length (%s) exceeds limit [%s:%s]"
389                 % (self._header.length, off, max_end)
390             )
391             end = max_end
392         self._octets = buf[off:end]
393         self._octets_length = self._header.length
394         base._LOGGER.debug('Reading octets from <%s>[%s:%s] = "%s"'
395             % (id(buf), off, end, data.to_hex(self._octets))
396         )
397         return end
398
399     def write(self, buf, off):
400         base._LOGGER.warn("Writing Unknown [%s]" % self._header)
401         off = super(Unknown, self).write(buf, off)
402         end = off + len(self._octets)
403         buf[off:end] = self._octets
404         return end
405
406     def __str__(self):
407         return '%soctets="%s"' % (
408             super(Unknown, self).__str__(),
409             data.to_hex(self._octets)
410         )
411
412 base.Tlv.unknown_class = Unknown
413
414