*/
package org.opendaylight.protocol.pcep.impl;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.pcep.PCEPCloseTermination;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.TerminationReason;
import org.opendaylight.protocol.pcep.impl.spi.Util;
import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.LocalPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.Messages;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.PeerPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
-import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* System.nanoTime value about when was sent the last message Protected to be updated also in tests.
*/
- @VisibleForTesting
- protected volatile long lastMessageSentAt;
+ private volatile long lastMessageSentAt;
/**
* System.nanoTime value about when was received the last message
private int maxUnknownMessages;
// True if the listener should not be notified about events
- private boolean closed = false;
+ @GuardedBy("this")
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private final Channel channel;
private final PCEPSessionState sessionState;
PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
- final Open localOpen, final Open remoteOpen) {
- this.listener = Preconditions.checkNotNull(listener);
- this.channel = Preconditions.checkNotNull(channel);
- this.localOpen = Preconditions.checkNotNull(localOpen);
- this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
+ final Open localOpen, final Open remoteOpen) {
+ this.listener = requireNonNull(listener);
+ this.channel = requireNonNull(channel);
+ this.localOpen = requireNonNull(localOpen);
+ this.remoteOpen = requireNonNull(remoteOpen);
this.lastMessageReceivedAt = TICKER.read();
if (maxUnknownMessages != 0) {
}
LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
- remoteOpen.getSessionId());
+ remoteOpen.getSessionId());
this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
}
* KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
* starts to execute (the session state will become IDLE), that rescheduling won't occur.
*/
- private void handleKeepaliveTimer() {
+ private void handleKeepaliveTimer() {
final long ct = TICKER.read();
long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
}
}
+ /**
+ * Handle exception occurred in the PCEP session. The session in error state should be closed
+ * properly so that it can be restored later.
+ */
+ @VisibleForTesting
+ void handleException(final Throwable cause) {
+ LOG.error("Exception captured for session {}, closing session.", this, cause);
+ terminate(TerminationReason.UNKNOWN);
+ }
+
/**
* Sends message to serialization.
*
@VisibleForTesting
ChannelFuture closeChannel() {
- LOG.info("Closing PCEP session: {}", this);
+ LOG.info("Closing PCEP session channel: {}", this.channel);
return this.channel.close();
}
+ @VisibleForTesting
+ public synchronized boolean isClosed() {
+ return this.closed.get();
+ }
+
/**
* Closes PCEP session without sending a Close message, as the channel is no longer active.
*/
@Override
- public void close() {
- LOG.info("Closing PCEP session: {}", this);
- closeChannel();
+ public synchronized void close() {
+ close(null);
}
/**
* inside the session or from the listener, therefore the parent of this session should be informed.
*/
@Override
- public synchronized void close(final TerminationReason reason) {
- LOG.info("Closing PCEP session: {}", this);
- this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
+ public void close(final TerminationReason reason) {
+ if (this.closed.getAndSet(true)) {
+ LOG.debug("Session is already closed.");
+ return;
+ }
+ // only send close message when the reason is provided
+ if (reason != null) {
+ LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+ sendMessage(new CloseBuilder().setCCloseMessage(
+ new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+ } else {
+ LOG.info("Closing PCEP session: {}", this);
+ }
+ closeChannel();
}
@Override
return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
}
- private synchronized void closeWithoutMessage(final TerminationReason reason) {
- LOG.info("Closing PCEP session without sending msg: {}", reason);
- this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
- this.closed = true;
- this.close();
- }
-
private synchronized void terminate(final TerminationReason reason) {
- LOG.info("Local PCEP session termination : {}", reason);
+ if (this.closed.get()) {
+ LOG.debug("Session {} is already closed.", this);
+ return;
+ }
+ close(reason);
this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
- this.closed = true;
- this.sendMessage(new CloseBuilder().setCCloseMessage(
- new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
- this.close();
}
- public synchronized void endOfInput() {
- if (!this.closed) {
+ synchronized void endOfInput() {
+ if (!this.closed.getAndSet(true)) {
this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
- this.closed = true;
}
}
* @param error documented error in RFC5440 or draft
*/
@VisibleForTesting
- public void handleMalformedMessage(final PCEPErrors error) {
+ void handleMalformedMessage(final PCEPErrors error) {
final long ct = TICKER.read();
this.sendErrorMessage(error);
if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
this.unknownMessagesTimes.add(ct);
- while ( ct - this.unknownMessagesTimes.peek() > MINUTE) {
- this.unknownMessagesTimes.poll();
+ while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
+ final Long poll = this.unknownMessagesTimes.poll();
}
if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
* @param msg incoming message
*/
public synchronized void handleMessage(final Message msg) {
+ if (this.closed.get()) {
+ LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
+ return;
+ }
// Update last reception time
this.lastMessageReceivedAt = TICKER.read();
this.sessionState.updateLastReceivedMsg();
* exception is CLOSE message, which needs to be converted into a
* session DOWN event.
*/
- this.closeWithoutMessage(TerminationReason.forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason()));
+ close();
+ this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+ .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
} else {
// This message needs to be handled by the user
if (msg instanceof PcerrMessage) {
}
@VisibleForTesting
- public void sessionUp() {
- this.listener.onSessionUp(this);
+ void sessionUp() {
+ try {
+ this.listener.onSessionUp(this);
+ } catch (final Exception e) {
+ handleException(e);
+ throw e;
+ }
}
@VisibleForTesting
- protected final Queue<Long> getUnknownMessagesTimes() {
+ final Queue<Long> getUnknownMessagesTimes() {
return this.unknownMessagesTimes;
}
}
@Override
- public Class<? extends DataContainer> getImplementedInterface() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void resetStats() {
- this.sessionState.reset();
+ public Open getLocalOpen() {
+ return this.sessionState.getLocalOpen();
}
@Override
- public final void channelInactive(final ChannelHandlerContext ctx) {
+ public synchronized final void channelInactive(final ChannelHandlerContext ctx) {
LOG.debug("Channel {} inactive.", ctx.channel());
- this.endOfInput();
+ endOfInput();
try {
super.channelInactive(ctx);
}
@Override
- protected final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
+ protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
LOG.debug("Message was received: {}", msg);
- this.handleMessage(msg);
+ handleMessage(msg);
}
@Override
- public final void handlerAdded(final ChannelHandlerContext ctx) {
+ public synchronized final void handlerAdded(final ChannelHandlerContext ctx) {
this.sessionUp();
}
@Override
- public Tlvs localSessionCharacteristics() {
+ public synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ handleException(cause);
+ }
+
+ @Override
+ public Tlvs getLocalTlvs() {
return this.localOpen.getTlvs();
}