2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.pcep.impl;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.MoreObjects.ToStringHelper;
13 import com.google.common.base.Preconditions;
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelFutureListener;
17 import io.netty.util.concurrent.Future;
18 import java.io.IOException;
19 import java.net.InetAddress;
20 import java.net.InetSocketAddress;
21 import java.util.Date;
22 import java.util.LinkedList;
23 import java.util.Queue;
24 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.protocol.framework.AbstractProtocolSession;
26 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
27 import org.opendaylight.protocol.pcep.PCEPSession;
28 import org.opendaylight.protocol.pcep.PCEPSessionListener;
29 import org.opendaylight.protocol.pcep.TerminationReason;
30 import org.opendaylight.protocol.pcep.impl.spi.Util;
31 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.LocalPref;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.Messages;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.PeerPref;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
48 import org.opendaylight.yangtools.yang.binding.DataContainer;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * Implementation of PCEPSession. (Not final for testing.)
56 public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements PCEPSession {
58 * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
61 protected volatile long lastMessageSentAt;
64 * System.nanoTime value about when was received the last message
66 private long lastMessageReceivedAt;
68 private final Queue<Long> unknownMessagesTimes = new LinkedList<Long>();
70 private final PCEPSessionListener listener;
73 * Open Object with session characteristics that were accepted by another PCE (sent from this session).
75 private final Open localOpen;
78 * Open Object with session characteristics for this session (sent from another PCE).
80 private final Open remoteOpen;
82 private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
84 private int maxUnknownMessages;
86 // True if the listener should not be notified about events
87 private boolean closed = false;
89 private final Channel channel;
91 private final Keepalive kaMessage = new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
93 private final PCEPSessionState sessionState;
95 PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
96 final Open localOpen, final Open remoteOpen) {
97 this.listener = Preconditions.checkNotNull(listener);
98 this.channel = Preconditions.checkNotNull(channel);
99 this.localOpen = Preconditions.checkNotNull(localOpen);
100 this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
101 this.lastMessageReceivedAt = System.nanoTime();
103 if (maxUnknownMessages != 0) {
104 this.maxUnknownMessages = maxUnknownMessages;
108 if (getDeadTimerValue() != 0) {
109 channel.eventLoop().schedule(new Runnable() {
114 }, getDeadTimerValue(), TimeUnit.SECONDS);
117 if (getKeepAliveTimerValue() != 0) {
118 channel.eventLoop().schedule(new Runnable() {
121 handleKeepaliveTimer();
123 }, getKeepAliveTimerValue(), TimeUnit.SECONDS);
126 LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
127 remoteOpen.getSessionId());
128 this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
131 public final Integer getKeepAliveTimerValue() {
132 return this.localOpen.getKeepalive().intValue();
135 public final Integer getDeadTimerValue() {
136 return this.remoteOpen.getDeadTimer().intValue();
140 * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
141 * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
142 * which the message was received. If the session was closed by the time this method starts to execute (the session
143 * state will become IDLE), that rescheduling won't occur.
145 private synchronized void handleDeadTimer() {
146 final long ct = System.nanoTime();
148 final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
150 if (this.channel.isActive()) {
151 if (ct >= nextDead) {
152 LOG.debug("DeadTimer expired. {}", new Date());
153 this.terminate(TerminationReason.EXP_DEADTIMER);
155 this.channel.eventLoop().schedule(new Runnable() {
160 }, nextDead - ct, TimeUnit.NANOSECONDS);
166 * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
167 * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
168 * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
169 * starts to execute (the session state will become IDLE), that rescheduling won't occur.
171 private void handleKeepaliveTimer() {
172 final long ct = System.nanoTime();
174 long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
176 if (this.channel.isActive()) {
177 if (ct >= nextKeepalive) {
178 this.sendMessage(this.kaMessage);
179 nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
182 this.channel.eventLoop().schedule(new Runnable() {
185 handleKeepaliveTimer();
187 }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
192 * Sends message to serialization.
194 * @param msg to be sent
197 public Future<Void> sendMessage(final Message msg) {
198 final ChannelFuture f = this.channel.writeAndFlush(msg);
199 this.lastMessageSentAt = System.nanoTime();
200 this.sessionState.updateLastSentMsg();
201 if (!(msg instanceof KeepaliveMessage)) {
202 LOG.debug("PCEP Message enqueued: {}", msg);
204 if (msg instanceof PcerrMessage) {
205 this.sessionState.setLastSentError(msg);
208 f.addListener(new ChannelFutureListener() {
210 public void operationComplete(final ChannelFuture arg) {
211 if (arg.isSuccess()) {
212 LOG.trace("Message sent to socket: {}", msg);
214 LOG.debug("Message not sent: {}", msg, arg.cause());
223 * Closes PCEP session without sending a Close message, as the channel is no longer active.
226 public void close() {
227 LOG.info("Closing PCEP session: {}", this);
228 this.channel.close();
232 * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
233 * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
234 * inside the session or from the listener, therefore the parent of this session should be informed.
237 public synchronized void close(final TerminationReason reason) {
238 LOG.info("Closing PCEP session: {}", this);
240 this.sendMessage(new CloseBuilder().setCCloseMessage(
241 new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
246 public Tlvs getRemoteTlvs() {
247 return this.remoteOpen.getTlvs();
251 public InetAddress getRemoteAddress() {
252 return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
255 private synchronized void terminate(final TerminationReason reason) {
256 LOG.info("Local PCEP session termination : {}", reason);
257 this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
259 this.sendMessage(new CloseBuilder().setCCloseMessage(
260 new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
265 public synchronized void endOfInput() {
267 this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
272 private void sendErrorMessage(final PCEPErrors value) {
273 this.sendErrorMessage(value, null);
277 * Sends PCEP Error Message with one PCEPError and Open Object.
282 private void sendErrorMessage(final PCEPErrors value, final Open open) {
283 this.sendMessage(Util.createErrorMessage(value, open));
287 * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
288 * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
289 * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
292 * @param error documented error in RFC5440 or draft
295 public void handleMalformedMessage(final PCEPErrors error) {
296 final long ct = System.nanoTime();
297 this.sendErrorMessage(error);
298 if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
299 this.unknownMessagesTimes.add(ct);
300 while (ct - this.unknownMessagesTimes.peek() > TimeUnit.MINUTES.toNanos(1)) {
301 this.unknownMessagesTimes.poll();
303 if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
304 this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
310 * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
313 * @param msg incoming message
316 public synchronized void handleMessage(final Message msg) {
317 // Update last reception time
318 this.lastMessageReceivedAt = System.nanoTime();
319 this.sessionState.updateLastReceivedMsg();
320 if (!(msg instanceof KeepaliveMessage)) {
321 LOG.debug("PCEP message {} received.", msg);
323 // Internal message handling. The user does not see these messages
324 if (msg instanceof KeepaliveMessage) {
325 // Do nothing, the timer has been already reset
326 } else if (msg instanceof OpenMessage) {
327 this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
328 } else if (msg instanceof CloseMessage) {
330 * Session is up, we are reporting all messages to user. One notable
331 * exception is CLOSE message, which needs to be converted into a
332 * session DOWN event.
336 // This message needs to be handled by the user
337 if (msg instanceof PcerrMessage) {
338 this.sessionState.setLastReceivedError(msg);
340 this.listener.onMessage(this, msg);
345 public final String toString() {
346 return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
349 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
350 toStringHelper.add("channel", this.channel);
351 toStringHelper.add("localOpen", this.localOpen);
352 toStringHelper.add("remoteOpen", this.remoteOpen);
353 return toStringHelper;
358 public void sessionUp() {
359 this.listener.onSessionUp(this);
363 protected final Queue<Long> getUnknownMessagesTimes() {
364 return this.unknownMessagesTimes;
368 public Messages getMessages() {
369 return this.sessionState.getMessages(this.unknownMessagesTimes.size());
373 public LocalPref getLocalPref() {
374 return this.sessionState.getLocalPref();
378 public PeerPref getPeerPref() {
379 return this.sessionState.getPeerPref();
383 public Class<? extends DataContainer> getImplementedInterface() {
384 throw new UnsupportedOperationException();
387 public void resetStats() {
388 this.sessionState.reset();