Add new revision for pcep types model
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
index 71469745c0ecc3d3a79807051a1d78cf281a37da..db5c8b50b8f6417a9bcca86a788601a1da86a05f 100644 (file)
@@ -7,10 +7,11 @@
  */
 package org.opendaylight.protocol.pcep.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -25,29 +26,30 @@ import java.util.Date;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
 import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.TerminationReason;
 import org.opendaylight.protocol.pcep.impl.spi.Util;
 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.LocalPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.Messages;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.PeerPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
-import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +63,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     /**
      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
      */
-    @VisibleForTesting
-    protected volatile long lastMessageSentAt;
+    private volatile long lastMessageSentAt;
 
     /**
      * System.nanoTime value about when was received the last message
@@ -88,7 +89,8 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     private int maxUnknownMessages;
 
     // True if the listener should not be notified about events
-    private boolean closed = false;
+    @GuardedBy("this")
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private final Channel channel;
 
@@ -97,11 +99,11 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     private final PCEPSessionState sessionState;
 
     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
-        final Open localOpen, final Open remoteOpen) {
-        this.listener = Preconditions.checkNotNull(listener);
-        this.channel = Preconditions.checkNotNull(channel);
-        this.localOpen = Preconditions.checkNotNull(localOpen);
-        this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
+            final Open localOpen, final Open remoteOpen) {
+        this.listener = requireNonNull(listener);
+        this.channel = requireNonNull(channel);
+        this.localOpen = requireNonNull(localOpen);
+        this.remoteOpen = requireNonNull(remoteOpen);
         this.lastMessageReceivedAt = TICKER.read();
 
         if (maxUnknownMessages != 0) {
@@ -118,7 +120,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         }
 
         LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
-            remoteOpen.getSessionId());
+                remoteOpen.getSessionId());
         this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
     }
 
@@ -157,7 +159,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
      */
-    private  void handleKeepaliveTimer() {
+    private void handleKeepaliveTimer() {
         final long ct = TICKER.read();
 
         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
@@ -172,6 +174,16 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         }
     }
 
+    /**
+     * Handle exception occurred in the PCEP session. The session in error state should be closed
+     * properly so that it can be restored later.
+     */
+    @VisibleForTesting
+    void handleException(final Throwable cause) {
+        LOG.error("Exception captured for session {}, closing session.", this, cause);
+        terminate(TerminationReason.UNKNOWN);
+    }
+
     /**
      * Sends message to serialization.
      *
@@ -202,17 +214,21 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @VisibleForTesting
     ChannelFuture closeChannel() {
-        LOG.info("Closing PCEP session: {}", this);
+        LOG.info("Closing PCEP session channel: {}", this.channel);
         return this.channel.close();
     }
 
+    @VisibleForTesting
+    public synchronized boolean isClosed() {
+        return this.closed.get();
+    }
+
     /**
      * Closes PCEP session without sending a Close message, as the channel is no longer active.
      */
     @Override
-    public void close() {
-        LOG.info("Closing PCEP session: {}", this);
-        closeChannel();
+    public synchronized void close() {
+        close(null);
     }
 
     /**
@@ -221,12 +237,20 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * inside the session or from the listener, therefore the parent of this session should be informed.
      */
     @Override
-    public synchronized void close(final TerminationReason reason) {
-        LOG.info("Closing PCEP session: {}", this);
-        this.closed = true;
-        this.sendMessage(new CloseBuilder().setCCloseMessage(
-            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
-        this.close();
+    public void close(final TerminationReason reason) {
+        if (this.closed.getAndSet(true)) {
+            LOG.debug("Session is already closed.");
+            return;
+        }
+        // only send close message when the reason is provided
+        if (reason != null) {
+            LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+            sendMessage(new CloseBuilder().setCCloseMessage(
+                    new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+        } else {
+            LOG.info("Closing PCEP session: {}", this);
+        }
+        closeChannel();
     }
 
     @Override
@@ -239,26 +263,18 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
     }
 
-    private synchronized void closeWithoutMessage(final TerminationReason reason) {
-        LOG.info("Closing PCEP session without sending msg: {}", reason);
-        this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
-        this.closed = true;
-        this.close();
-    }
-
     private synchronized void terminate(final TerminationReason reason) {
-        LOG.info("Local PCEP session termination : {}", reason);
+        if (this.closed.get()) {
+            LOG.debug("Session {} is already closed.", this);
+            return;
+        }
+        close(reason);
         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
-        this.closed = true;
-        this.sendMessage(new CloseBuilder().setCCloseMessage(
-            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
-        this.close();
     }
 
-    public synchronized void endOfInput() {
-        if (!this.closed) {
+    synchronized void endOfInput() {
+        if (!this.closed.getAndSet(true)) {
             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
-            this.closed = true;
         }
     }
 
@@ -285,13 +301,13 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param error documented error in RFC5440 or draft
      */
     @VisibleForTesting
-    public void handleMalformedMessage(final PCEPErrors error) {
+    void handleMalformedMessage(final PCEPErrors error) {
         final long ct = TICKER.read();
         this.sendErrorMessage(error);
         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
             this.unknownMessagesTimes.add(ct);
-            while ( ct - this.unknownMessagesTimes.peek() > MINUTE) {
-                this.unknownMessagesTimes.poll();
+            while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
+                final Long poll = this.unknownMessagesTimes.poll();
             }
             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
                 this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
@@ -306,6 +322,10 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param msg incoming message
      */
     public synchronized void handleMessage(final Message msg) {
+        if (this.closed.get()) {
+            LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
+            return;
+        }
         // Update last reception time
         this.lastMessageReceivedAt = TICKER.read();
         this.sessionState.updateLastReceivedMsg();
@@ -323,7 +343,9 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
              * exception is CLOSE message, which needs to be converted into a
              * session DOWN event.
              */
-            this.closeWithoutMessage(TerminationReason.forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason()));
+            close();
+            this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+                    .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
         } else {
             // This message needs to be handled by the user
             if (msg instanceof PcerrMessage) {
@@ -346,12 +368,17 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     @VisibleForTesting
-    public void sessionUp() {
-        this.listener.onSessionUp(this);
+    void sessionUp() {
+        try {
+            this.listener.onSessionUp(this);
+        } catch (final Exception e) {
+            handleException(e);
+            throw e;
+        }
     }
 
     @VisibleForTesting
-    protected final Queue<Long> getUnknownMessagesTimes() {
+    final Queue<Long> getUnknownMessagesTimes() {
         return this.unknownMessagesTimes;
     }
 
@@ -371,19 +398,14 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     @Override
-    public Class<? extends DataContainer> getImplementedInterface() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void resetStats() {
-        this.sessionState.reset();
+    public Open getLocalOpen() {
+        return this.sessionState.getLocalOpen();
     }
 
     @Override
-    public final void channelInactive(final ChannelHandlerContext ctx) {
+    public synchronized final void channelInactive(final ChannelHandlerContext ctx) {
         LOG.debug("Channel {} inactive.", ctx.channel());
-        this.endOfInput();
+        endOfInput();
 
         try {
             super.channelInactive(ctx);
@@ -393,18 +415,23 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     @Override
-    protected final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
+    protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
         LOG.debug("Message was received: {}", msg);
-        this.handleMessage(msg);
+        handleMessage(msg);
     }
 
     @Override
-    public final void handlerAdded(final ChannelHandlerContext ctx) {
+    public synchronized final void handlerAdded(final ChannelHandlerContext ctx) {
         this.sessionUp();
     }
 
     @Override
-    public Tlvs localSessionCharacteristics() {
+    public  synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        handleException(cause);
+    }
+
+    @Override
+    public Tlvs getLocalTlvs() {
         return this.localOpen.getTlvs();
     }