3 # Copyright (c) 2012,2013 Cisco Systems, Inc. and others. All rights reserved.
5 # This program and the accompanying materials are made available under the
6 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 # and is available at http://www.eclipse.org/legal/epl-v10.html
11 import socket as _socket
14 from . import lsp as _lsp
15 from pcepy import session as _session
16 from pcepy import message as _message
19 """A simulated Path Computation Element"""
21 CONFIG_SERVER_CONFIG = 'pce.server_config'
22 CONFIG_SESSION_CONFIG = 'pce.session_config'
24 def __init__(self, name, context):
25 super(Pce, self).__init__(name, context)
26 self._servers = list()
28 def _get_active(self):
29 return bool(self._servers) or super(Pce, self)._get_active()
31 def _create_handlers(self):
32 super(Pce, self)._create_handlers()
33 for handler_class in (Listener, Reporter,):
34 self.add_handler(self._create_handler(handler_class))
36 def create_server(self, node):
37 """Bind to a server socket specified by node and put it on the bus."""
38 key = Pce.CONFIG_SERVER_CONFIG
39 server_config = self[key]
41 server_config = dict(server_config)
43 server_config = dict()
44 server_node_config = self[key, node.name]
45 if server_node_config:
46 server_config.update(server_node_config)
49 node = node.with_port(_session.PCEP_PORT)
50 server = _session.PcepServer(self, node, server_config)
51 self._servers.append(server)
52 self.context.bus.add(server)
55 def create_session(self, local, socket, remote):
56 """Create PCEP session from socket and put it on the bus."""
57 key = Pce.CONFIG_SESSION_CONFIG
58 session_config = self[key]
60 session_config = dict(session_config)
62 session_config = dict()
63 session_remote_config = self[key, remote.name]
64 if session_remote_config:
65 session_config.update(session_remote_config)
67 pcep_session = _session.PcepAccept(
68 self, local, socket, remote, session_config
70 self._sessions.append(pcep_session)
71 self.context.bus.add(pcep_session)
75 "Ask all server sessions to close. Then close all incoming sessions"
76 for server in self._servers:
78 super(Pce, self).shutdown()
81 class Listener(base.Handler):
82 """Manage incoming connections on PceServer sockets"""
84 CONFIG_TIMEOUT = 'pcep_server.timeout'
85 STATE_TIMEOUT = '_listener.timeout'
87 def on_open(self, peer, eventargs):
88 server = eventargs['session']
89 if not server.is_server():
91 if Listener.CONFIG_TIMEOUT not in server:
92 timeout = peer[Listener.CONFIG_TIMEOUT]
94 server[Listener.STATE_TIMEOUT] = _session.resolve_timeout(timeout)
96 def on_close(self, peer, eventargs):
97 server = eventargs['session']
98 if not server.is_server():
100 peer._servers.remove(server)
102 def on_connection(self, peer, eventargs):
103 # TODO: restrict addresses by config
104 base._LOGGER.debug('Creating accepted session on peer %s' % peer)
105 server = eventargs['server']
106 socket = eventargs['socket']
107 address, port = eventargs['address'][:2]
108 address = peer.context.address_from(address)
109 node = peer.context.get_node(_session.Node.ROLE_PCC,
110 address=address, port=port
112 session = peer.create_session(server.local, socket, node)
113 base._LOGGER.debug('Created accepted session %s' % session)
114 del server[Listener.CONFIG_TIMEOUT] #TODO: only if not expecting more
116 def on_timeout(self, peer, eventargs):
117 session = eventargs['session']
118 now = eventargs['now']
120 timeout = session[Listener.STATE_TIMEOUT]
121 if not timeout or timeout > now:
124 del session[Listener.STATE_TIMEOUT]
125 peer.emit('on_socket_error', session=session,
126 error=_socket.timeout('%s: timed out' % session)
128 session.closing = True
130 def timeout(self, session):
131 if session.is_server():
132 return session[Listener.STATE_TIMEOUT]
135 class Requester(base.Handler):
136 """Manage reception of PCReq messages."""
137 # May be implemented later
141 class Reporter(base.Handler):
142 """Manage reception of PCRpt messages.
144 Manage state database (lsp.StateReports). Await specific state reports.
146 The Reporter emits these events to its peer:
147 on_state_report(session, lsp, report, new):
148 A [new] state report has arrived; called before adding to statedb.
150 on_synchronized(session, statedb):
151 State synchronization has completed and recorded in lsp.Reports.
153 on_await_report(session, key, arrived):
154 State report for key has arrived (with report) or timed out (None).
157 STATE_STATEDB = '_reporter.statedb'
158 STATE_AWAITED = '_reporter.awaited'
159 STATE_STATE = '_reporter.state'
161 # Values for STATE_STATE
162 RS_NONE, RS_SYNCING, RS_AVOID, RS_SYNCED = range(0, 4)
164 def on_session_open(self, peer, eventargs):
165 session = eventargs['session']
166 if session[base.Opener.STATE_PCEPTYPE] == base.Opener.PCEPTYPE_STATELESS:
167 session[Reporter.STATE_STATE] = Reporter.RS_NONE
169 statedb = self._get_statedb(peer, session)
170 session[Reporter.STATE_STATEDB] = statedb
171 session[Reporter.STATE_AWAITED] = self._get_awaited(peer, session)
173 local_open = session[base.Opener.STATE_LOCAL_OPEN]
174 remote_open = session[base.Opener.STATE_REMOTE_OPEN]
175 avoid = statedb.can_avoid(pce_open=local_open, pcc_open=remote_open)
177 base._LOGGER.info('Session "%s" has valid database version "%s"'
178 % (session, statedb.version)
180 state = Reporter.RS_AVOID
182 state = Reporter.RS_SYNCING
183 session[Reporter.STATE_STATE] = state
185 def on_message(self, peer, eventargs):
186 session = eventargs['session']
187 statedb = session[Reporter.STATE_STATEDB]
190 message = eventargs['message']
191 if not isinstance(message, _message.PCRpt):
193 awaited = session[Reporter.STATE_AWAITED]
194 use_dbv = session[base.Opener.STATE_USE_DBV]
195 state = session[Reporter.STATE_STATE]
197 for report in message.poll('report'):
198 report_lsp = report.poll('lsp')
199 if report_lsp is None:
200 peer.make_pcep_error(
204 send = _message.code.Error.MandatoryObjectMissing_LSP,
208 lsp = statedb[report_lsp.lsp_id]
211 name = report_lsp.get(_message.tlv.LspSymbolicName)
213 base._LOGGER.error('New LSP "%s" missing name in "%s"'
214 % (report_lsp, session)
216 # FIXME: should be a PCEP error
219 lsp = _lsp.Lsp(name=name, lsp_id=report_lsp.lsp_id)
223 peer.emit('on_state_report', session=session,
224 lsp = lsp, report = report, new = new
228 statedb.get_version(report_lsp, use_dbv)
229 except ValueError as value_error:
230 peer.make_pcep_error(
233 cause = (report, value_error),
234 send = _message.code.Error.MandatoryObjectMissing_DBV,
243 for key in awaited.match(report):
244 peer.emit('on_await_report', session=session,
245 key = key, arrived = report
248 if not report_lsp.synchronize:
249 if state != Reporter.RS_SYNCED:
250 state = Reporter.RS_SYNCED
251 session[Reporter.STATE_STATE] = state
252 peer.emit('on_synchronized', session=session,
255 elif state == Reporter.RS_AVOID:
256 base._LOGGER.warning('Session "%s": synchronization not avoided'
259 state = Reporter.RS_SYNCING
260 session[Reporter.STATE_STATE] = state
262 base._LOGGER.error('Session "%s": already synchronized'
266 def on_timeout(self, peer, eventargs):
267 session = eventargs['session']
268 now = eventargs['now']
269 awaited = session[Reporter.STATE_AWAITED]
272 outs = awaited.out(now)
274 peer.emit('on_await_report', session=session,
275 key = out, arrived = None
278 def timeout(self, session):
279 awaited = session[Reporter.STATE_AWAITED]
280 return None if awaited is None else awaited.timeout
282 def _get_statedb(self, peer, session):
283 """Create state database for session"""
284 return _lsp.StateDb(self, peer, session)
286 def _get_awaited(self, peer, session):
287 """Create awaited report database for session"""
288 return _lsp.Awaited(self, session)
290 def await(self, session, criteria, timeout=None):
291 """Watch for a [set of] state reports satisfying criteria."""
292 awaited = session[Reporter.STATE_AWAITED]
293 for key, criterion in criteria.items():
294 awaited.add(self._get_await(session, key, criterion, timeout))
296 def _get_await(self, session, key, criterion, timeout=None):
297 """Transform a criterium into an Await object."""
298 return _lsp.Await(key, criterion, criterion.get('timeout', timeout))