*/
package org.opendaylight.protocol.pcep.impl;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetAddress;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.pcep.PCEPCloseTermination;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.TerminationReason;
+import org.opendaylight.protocol.pcep.impl.spi.Util;
import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.LocalPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.Messages;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.PeerPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
-import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Implementation of PCEPSession. (Not final for testing.)
*/
@VisibleForTesting
-public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements PCEPSession {
+public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implements PCEPSession {
+ private static final long MINUTE = TimeUnit.MINUTES.toNanos(1);
+ private static Ticker TICKER = Ticker.systemTicker();
/**
* System.nanoTime value about when was sent the last message Protected to be updated also in tests.
*/
- @VisibleForTesting
- protected volatile long lastMessageSentAt;
+ private volatile long lastMessageSentAt;
/**
* System.nanoTime value about when was received the last message
*/
private long lastMessageReceivedAt;
- private final Queue<Long> unknownMessagesTimes = new LinkedList<Long>();
+ private final Queue<Long> unknownMessagesTimes = new LinkedList<>();
private final PCEPSessionListener listener;
private int maxUnknownMessages;
// True if the listener should not be notified about events
- private boolean closed = false;
+ @GuardedBy("this")
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private final Channel channel;
private final PCEPSessionState sessionState;
PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
- final Open localOpen, final Open remoteOpen) {
- this.listener = Preconditions.checkNotNull(listener);
- this.channel = Preconditions.checkNotNull(channel);
- this.localOpen = Preconditions.checkNotNull(localOpen);
- this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
- this.lastMessageReceivedAt = System.nanoTime();
+ final Open localOpen, final Open remoteOpen) {
+ this.listener = requireNonNull(listener);
+ this.channel = requireNonNull(channel);
+ this.localOpen = requireNonNull(localOpen);
+ this.remoteOpen = requireNonNull(remoteOpen);
+ this.lastMessageReceivedAt = TICKER.read();
if (maxUnknownMessages != 0) {
this.maxUnknownMessages = maxUnknownMessages;
if (getDeadTimerValue() != 0) {
- channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleDeadTimer();
- }
- }, getDeadTimerValue(), TimeUnit.SECONDS);
+ channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS);
}
if (getKeepAliveTimerValue() != 0) {
- channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleKeepaliveTimer();
- }
- }, getKeepAliveTimerValue(), TimeUnit.SECONDS);
+ channel.eventLoop().schedule(this::handleKeepaliveTimer, getKeepAliveTimerValue(), TimeUnit.SECONDS);
}
LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
- remoteOpen.getSessionId());
+ remoteOpen.getSessionId());
this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
}
* state will become IDLE), that rescheduling won't occur.
*/
private synchronized void handleDeadTimer() {
- final long ct = System.nanoTime();
+ final long ct = TICKER.read();
final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
LOG.debug("DeadTimer expired. {}", new Date());
this.terminate(TerminationReason.EXP_DEADTIMER);
} else {
- this.channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleDeadTimer();
- }
- }, nextDead - ct, TimeUnit.NANOSECONDS);
+ this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
}
}
}
* KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
* starts to execute (the session state will become IDLE), that rescheduling won't occur.
*/
- private void handleKeepaliveTimer() {
- final long ct = System.nanoTime();
+ private void handleKeepaliveTimer() {
+ final long ct = TICKER.read();
long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
}
- this.channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleKeepaliveTimer();
- }
- }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
+ this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
}
}
+ /**
+ * Handle exception occurred in the PCEP session. The session in error state should be closed
+ * properly so that it can be restored later.
+ */
+ @VisibleForTesting
+ void handleException(final Throwable cause) {
+ LOG.error("Exception captured for session {}, closing session.", this, cause);
+ terminate(TerminationReason.UNKNOWN);
+ }
+
/**
* Sends message to serialization.
*
@Override
public Future<Void> sendMessage(final Message msg) {
final ChannelFuture f = this.channel.writeAndFlush(msg);
- this.lastMessageSentAt = System.nanoTime();
+ this.lastMessageSentAt = TICKER.read();
this.sessionState.updateLastSentMsg();
if (!(msg instanceof KeepaliveMessage)) {
LOG.debug("PCEP Message enqueued: {}", msg);
this.sessionState.setLastSentError(msg);
}
- f.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture arg) {
- if (arg.isSuccess()) {
- LOG.trace("Message sent to socket: {}", msg);
- } else {
- LOG.debug("Message not sent: {}", msg, arg.cause());
- }
+ f.addListener((ChannelFutureListener) arg -> {
+ if (arg.isSuccess()) {
+ LOG.trace("Message sent to socket: {}", msg);
+ } else {
+ LOG.debug("Message not sent: {}", msg, arg.cause());
}
});
return f;
}
+ @VisibleForTesting
+ ChannelFuture closeChannel() {
+ LOG.info("Closing PCEP session channel: {}", this.channel);
+ return this.channel.close();
+ }
+
+ @VisibleForTesting
+ public synchronized boolean isClosed() {
+ return this.closed.get();
+ }
+
/**
* Closes PCEP session without sending a Close message, as the channel is no longer active.
*/
@Override
- public void close() {
- LOG.info("Closing PCEP session: {}", this);
- this.channel.close();
+ public synchronized void close() {
+ close(null);
}
/**
* inside the session or from the listener, therefore the parent of this session should be informed.
*/
@Override
- public synchronized void close(final TerminationReason reason) {
- LOG.info("Closing PCEP session: {}", this);
- this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
+ public void close(final TerminationReason reason) {
+ if (this.closed.getAndSet(true)) {
+ LOG.debug("Session is already closed.");
+ return;
+ }
+ // only send close message when the reason is provided
+ if (reason != null) {
+ LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+ sendMessage(new CloseBuilder().setCCloseMessage(
+ new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+ } else {
+ LOG.info("Closing PCEP session: {}", this);
+ }
+ closeChannel();
}
@Override
}
private synchronized void terminate(final TerminationReason reason) {
- LOG.info("Local PCEP session termination : {}", reason);
+ if (this.closed.get()) {
+ LOG.debug("Session {} is already closed.", this);
+ return;
+ }
+ close(reason);
this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
- this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
}
- @Override
- public synchronized void endOfInput() {
- if (!this.closed) {
+ synchronized void endOfInput() {
+ if (!this.closed.getAndSet(true)) {
this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
- this.closed = true;
}
}
* @param error documented error in RFC5440 or draft
*/
@VisibleForTesting
- public void handleMalformedMessage(final PCEPErrors error) {
- final long ct = System.nanoTime();
+ void handleMalformedMessage(final PCEPErrors error) {
+ final long ct = TICKER.read();
this.sendErrorMessage(error);
if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
this.unknownMessagesTimes.add(ct);
- while (ct - this.unknownMessagesTimes.peek() > TimeUnit.MINUTES.toNanos(1)) {
- this.unknownMessagesTimes.poll();
+ while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
+ final Long poll = this.unknownMessagesTimes.poll();
}
if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
*
* @param msg incoming message
*/
- @Override
public synchronized void handleMessage(final Message msg) {
+ if (this.closed.get()) {
+ LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
+ return;
+ }
// Update last reception time
- this.lastMessageReceivedAt = System.nanoTime();
+ this.lastMessageReceivedAt = TICKER.read();
this.sessionState.updateLastReceivedMsg();
if (!(msg instanceof KeepaliveMessage)) {
LOG.debug("PCEP message {} received.", msg);
* exception is CLOSE message, which needs to be converted into a
* session DOWN event.
*/
- this.close();
+ close();
+ this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+ .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
} else {
// This message needs to be handled by the user
if (msg instanceof PcerrMessage) {
return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
}
- protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
toStringHelper.add("channel", this.channel);
toStringHelper.add("localOpen", this.localOpen);
toStringHelper.add("remoteOpen", this.remoteOpen);
return toStringHelper;
}
- @Override
@VisibleForTesting
- public void sessionUp() {
- this.listener.onSessionUp(this);
+ void sessionUp() {
+ try {
+ this.listener.onSessionUp(this);
+ } catch (final Exception e) {
+ handleException(e);
+ throw e;
+ }
}
@VisibleForTesting
- protected final Queue<Long> getUnknownMessagesTimes() {
+ final Queue<Long> getUnknownMessagesTimes() {
return this.unknownMessagesTimes;
}
}
@Override
- public Class<? extends DataContainer> getImplementedInterface() {
- throw new UnsupportedOperationException();
+ public Open getLocalOpen() {
+ return this.sessionState.getLocalOpen();
+ }
+
+ @Override
+ public synchronized final void channelInactive(final ChannelHandlerContext ctx) {
+ LOG.debug("Channel {} inactive.", ctx.channel());
+ endOfInput();
+
+ try {
+ super.channelInactive(ctx);
+ } catch (final Exception e) {
+ throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+ }
+ }
+
+ @Override
+ protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
+ LOG.debug("Message was received: {}", msg);
+ handleMessage(msg);
}
+
+ @Override
+ public synchronized final void handlerAdded(final ChannelHandlerContext ctx) {
+ this.sessionUp();
+ }
+
+ @Override
+ public synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ handleException(cause);
+ }
+
@Override
- public void resetStats() {
- this.sessionState.reset();
+ public Tlvs getLocalTlvs() {
+ return this.localOpen.getTlvs();
+ }
+
+ @VisibleForTesting
+ static void setTicker(final Ticker ticker) {
+ TICKER = ticker;
}
}