3 # Copyright (c) 2012,2013 Cisco Systems, Inc. and others. All rights reserved.
5 # This program and the accompanying materials are made available under the
6 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 # and is available at http://www.eclipse.org/legal/epl-v10.html
11 from . import lsp as _lsp
12 from pcepy import session as _session
13 from pcepy import message as _message
16 """A simulated Path Computation Client"""
18 CONFIG_SESSION_CONFIG = 'pcc.session_config'
20 def _create_handlers(self):
21 super(Pcc, self)._create_handlers()
22 for handler_class in (Updater,):
23 self.add_handler(self._create_handler(handler_class))
25 def create_session(self, local, remote):
26 """Create PCEP client session to remote node and put it on the bus."""
27 key = Pcc.CONFIG_SESSION_CONFIG
28 session_config = self[key]
30 session_config = dict(session_config)
32 session_config = dict()
33 session_remote_config = self[key, remote.name]
34 if session_remote_config:
35 session_config.update(session_remote_config)
37 pcep_session = _session.PcepClient(
38 self, local, remote, session_config
40 self._sessions.append(pcep_session)
41 self.context.bus.add(pcep_session)
45 class Replyer(base.Handler):
46 """Manage reception of PCRep messages."""
47 # May be implemented later
51 class Updater(base.Handler):
52 """Manage reception of PCUpd messages.
54 Manage state database (lsp.UpdateRequest). Await specific update requests.
56 The Updater emits these events to its peer:
57 on_update_request(session, lsp, request):
58 An update request has arrived; called before adding to updates.
59 If lsp is None, it is now known.
61 on_await_update(session, key, arrived):
62 Update request for key has arrived (with update) or timed out (None).
65 CONFIG_SYNC_SEPARATELY = 'updater.sync_separately'
67 STATE_STATEDB = '_updater.statedb'
68 STATE_AWAITED = '_updater.awaited'
70 def on_session_open(self, peer, eventargs):
71 session = eventargs['session']
72 if session[base.Opener.STATE_PCEPTYPE] == base.Opener.PCEPTYPE_STATELESS:
74 statedb = self._get_statedb(peer, session)
75 session[Updater.STATE_STATEDB] = statedb
76 self._make_lsps(peer, session, statedb)
77 session[Updater.STATE_AWAITED] = self._get_awaited(peer, session)
79 local_open = session[base.Opener.STATE_LOCAL_OPEN]
80 remote_open = session[base.Opener.STATE_REMOTE_OPEN]
81 avoid = statedb.can_avoid(pcc_open=local_open, pce_open=remote_open)
83 base._LOGGER.info('Session "%s" has valid database version "%s"'
84 % (session, statedb.version)
87 self._send_state_sync(peer, session)
89 def on_message(self, peer, eventargs):
90 session = eventargs['session']
91 statedb = session[Updater.STATE_STATEDB]
94 message = eventargs['message']
95 if not isinstance(message, _message.PCUpd):
97 awaited = session[Updater.STATE_AWAITED]
99 if session[base.Opener.STATE_PCEPTYPE] != base.Opener.PCEPTYPE_STATEFULA:
100 peer.make_pcep_error(
104 send = _message.code.Error.InvalidOperation_DelegationNotActive,
109 for update in message.poll('update'):
110 if not update.have('lsp'):
111 peer.make_pcep_error(
115 send = _message.code.Error.MandatoryObjectMissing_LSP,
119 lsp_id = update.poll('lsp').lsp_id
120 lsp = statedb[lsp_id]
121 peer.emit('on_update_request', session=session,
122 lsp_id = lsp_id, lsp = lsp, update = update
127 for key in awaited.match(update):
128 peer.emit('on_await_update', session=session,
129 key = key, arrived = update
132 def on_timeout(self, peer, eventargs):
133 session = eventargs['session']
134 now = eventargs['now']
135 awaited = session[Updater.STATE_AWAITED]
138 outs = awaited.out(now)
140 peer.emit('on_await_report', session=session,
141 key = out, arrived = None
144 def timeout(self, session):
145 awaited = session[Updater.STATE_AWAITED]
146 return None if awaited is None else awaited.timeout
148 def _get_statedb(self, peer, session):
149 """Create state database for session"""
150 return _lsp.StateDb(self, peer, session)
152 def _get_awaited(self, peer, session):
153 """Create awaited update database for session"""
154 return _lsp.Awaited(self, session)
156 def _make_lsps(self, peer, session, statedb):
157 """Populate the state database on session open.
158 Should be implemented by a subclass.
162 def _make_report(self, peer, session, lsp):
163 """Create a lsp.Report object for lsp or None."""
166 def _send_state_sync(self, peer, session):
167 """Send state reports from current database."""
168 statedb = session[Updater.STATE_STATEDB]
170 reports = [ self._make_report(peer, session, lsp) for lsp in statedb ]
171 reports = [ report for report in reports if report is not None ]
176 use_dbv = session[base.Opener.STATE_USE_DBV]
178 for report in reports:
179 lsp = report.poll('lsp')
180 if lsp is not None: # possibly sending bad report
181 lsp.synchronize = True
183 statedb.put_version(lsp)
185 if self[Updater.CONFIG_SYNC_SEPARATELY]:
186 for report in reports:
187 message = _message.PCRpt()
188 message.push('report', report)
190 session.send(message)
192 message = _message.PCRpt()
193 message.push('report', reports)
195 session.send(message)
197 def await(self, session, criteria, timeout=None):
198 """Watch for a [set of] update requests satisfying criteria."""
199 awaited = session[Updater.STATE_AWAITED]
200 for key, criterion in criteria.items():
201 awaited.add(self._get_await(session, key, criterion, timeout))
203 def _get_await(self, session, key, criterion, timeout=None):
204 """Transform a criterium into an Await object."""
205 return _lsp.Await(key, criterion, criterion.get('timeout', timeout))