BUG-197: improve session ID tracking
[bgpcep.git] / pcep / pcepy / peer / pcc.py
1 # PCC and its handlers
2
3 # Copyright (c) 2012,2013 Cisco Systems, Inc. and others.  All rights reserved.
4 #
5 # This program and the accompanying materials are made available under the
6 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 # and is available at http://www.eclipse.org/legal/epl-v10.html
8
9
10 from . import base
11 from . import lsp as _lsp
12 from pcepy import session as _session
13 from pcepy import message as _message
14
15 class Pcc(base.Peer):
16     """A simulated Path Computation Client"""
17
18     CONFIG_SESSION_CONFIG = 'pcc.session_config'
19
20     def _create_handlers(self):
21         super(Pcc, self)._create_handlers()
22         for handler_class in (Updater,):
23             self.add_handler(self._create_handler(handler_class))
24
25     def create_session(self, local, remote):
26         """Create PCEP client session to remote node and put it on the bus."""
27         key = Pcc.CONFIG_SESSION_CONFIG
28         session_config = self[key]
29         if session_config:
30             session_config = dict(session_config)
31         else:
32             session_config = dict()
33         session_remote_config = self[key, remote.name]
34         if session_remote_config:
35             session_config.update(session_remote_config)
36
37         pcep_session = _session.PcepClient(
38             self, local, remote, session_config
39         )
40         self._sessions.append(pcep_session)
41         self.context.bus.add(pcep_session)
42         return pcep_session
43
44
45 class Replyer(base.Handler):
46     """Manage reception of PCRep messages."""
47     # May be implemented later
48     pass
49
50
51 class Updater(base.Handler):
52     """Manage reception of PCUpd messages.
53
54     Manage state database (lsp.UpdateRequest). Await specific update requests.
55
56     The Updater emits these events to its peer:
57         on_update_request(session, lsp, request):
58             An update request has arrived; called before adding to updates.
59             If lsp is None, it is now known.
60
61         on_await_update(session, key, arrived):
62             Update request for key has arrived (with update) or timed out (None).
63     """
64
65     CONFIG_SYNC_SEPARATELY = 'updater.sync_separately'
66
67     STATE_STATEDB = '_updater.statedb'
68     STATE_AWAITED = '_updater.awaited'
69
70     def on_session_open(self, peer, eventargs):
71         session = eventargs['session']
72         if session[base.Opener.STATE_PCEPTYPE] == base.Opener.PCEPTYPE_STATELESS:
73             return
74         statedb = self._get_statedb(peer, session)
75         session[Updater.STATE_STATEDB] = statedb
76         self._make_lsps(peer, session, statedb)
77         session[Updater.STATE_AWAITED] = self._get_awaited(peer, session)
78
79         local_open = session[base.Opener.STATE_LOCAL_OPEN]
80         remote_open = session[base.Opener.STATE_REMOTE_OPEN]
81         avoid = statedb.can_avoid(pcc_open=local_open, pce_open=remote_open)
82         if avoid:
83             base._LOGGER.info('Session "%s" has valid database version "%s"'
84                 % (session, statedb.version)
85             )
86         else:
87             self._send_state_sync(peer, session)
88
89     def on_message(self, peer, eventargs):
90         session = eventargs['session']
91         statedb = session[Updater.STATE_STATEDB]
92         if statedb is None:
93             return
94         message = eventargs['message']
95         if not isinstance(message, _message.PCUpd):
96             return
97         awaited = session[Updater.STATE_AWAITED]
98
99         if session[base.Opener.STATE_PCEPTYPE] != base.Opener.PCEPTYPE_STATEFULA:
100             peer.make_pcep_error(
101                 origin = self,
102                 session = session,
103                 cause = message,
104                 send = _message.code.Error.InvalidOperation_DelegationNotActive,
105                 closing = False,
106             )
107             return
108
109         for update in message.poll('update'):
110             if not update.have('lsp'):
111                 peer.make_pcep_error(
112                     origin = self,
113                     session = session,
114                     cause = update,
115                     send = _message.code.Error.MandatoryObjectMissing_LSP,
116                     closing = False,
117                 )
118                 continue
119             lsp_id = update.poll('lsp').lsp_id
120             lsp = statedb[lsp_id]
121             peer.emit('on_update_request', session=session,
122                 lsp_id = lsp_id, lsp = lsp, update = update
123             )
124             if lsp is not None:
125                 lsp.update = update
126
127             for key in awaited.match(update):
128                 peer.emit('on_await_update', session=session,
129                     key = key, arrived = update
130                 )
131
132     def on_timeout(self, peer, eventargs):
133         session = eventargs['session']
134         now = eventargs['now']
135         awaited = session[Updater.STATE_AWAITED]
136         if awaited is None:
137             return
138         outs = awaited.out(now)
139         for out in outs:
140             peer.emit('on_await_report', session=session,
141                 key = out, arrived = None
142             )
143
144     def timeout(self, session):
145         awaited = session[Updater.STATE_AWAITED]
146         return None if awaited is None else awaited.timeout
147
148     def _get_statedb(self, peer, session):
149         """Create state database for session"""
150         return _lsp.StateDb(self, peer, session)
151
152     def _get_awaited(self, peer, session):
153         """Create awaited update database for session"""
154         return _lsp.Awaited(self, session)
155
156     def _make_lsps(self, peer, session, statedb):
157         """Populate the state database on session open.
158         Should be implemented by a subclass.
159         """
160         pass
161
162     def _make_report(self, peer, session, lsp):
163         """Create a lsp.Report object for lsp or None."""
164         pass
165
166     def _send_state_sync(self, peer, session):
167         """Send state reports from current database."""
168         statedb = session[Updater.STATE_STATEDB]
169
170         reports = [ self._make_report(peer, session, lsp) for lsp in statedb ]
171         reports = [ report for report in reports if report is not None ]
172         count = len(reports)
173         if not count:
174             return
175
176         use_dbv = session[base.Opener.STATE_USE_DBV]
177
178         for report in reports:
179             lsp = report.poll('lsp')
180             if lsp is not None: # possibly sending bad report
181                 lsp.synchronize = True
182                 if use_dbv:
183                     statedb.put_version(lsp)
184
185         if self[Updater.CONFIG_SYNC_SEPARATELY]:
186             for report in reports:
187                 message = _message.PCRpt()
188                 message.push('report', report)
189                 message.pack()
190                 session.send(message)
191         else:
192             message = _message.PCRpt()
193             message.push('report', reports)
194             message.pack()
195             session.send(message)
196
197     def await(self, session, criteria, timeout=None):
198         """Watch for a [set of] update requests satisfying criteria."""
199         awaited = session[Updater.STATE_AWAITED]
200         for key, criterion in criteria.items():
201             awaited.add(self._get_await(session, key, criterion, timeout))
202
203     def _get_await(self, session, key, criterion, timeout=None):
204         """Transform a criterium into an Await object."""
205         return _lsp.Await(key, criterion, criterion.get('timeout', timeout))
206