Add new revision for pcep types model
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
index 22ff18e53e4c088556ee7bf8c20ceb8e828bc3a3..db5c8b50b8f6417a9bcca86a788601a1da86a05f 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Date;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
 import org.opendaylight.protocol.pcep.PCEPSession;
@@ -33,23 +34,22 @@ import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.TerminationReason;
 import org.opendaylight.protocol.pcep.impl.spi.Util;
 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.LocalPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.Messages;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.PeerPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
-import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,8 +63,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     /**
      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
      */
-    @VisibleForTesting
-    protected volatile long lastMessageSentAt;
+    private volatile long lastMessageSentAt;
 
     /**
      * System.nanoTime value about when was received the last message
@@ -91,7 +90,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     // True if the listener should not be notified about events
     @GuardedBy("this")
-    private boolean closed = false;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private final Channel channel;
 
@@ -100,7 +99,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     private final PCEPSessionState sessionState;
 
     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
-        final Open localOpen, final Open remoteOpen) {
+            final Open localOpen, final Open remoteOpen) {
         this.listener = requireNonNull(listener);
         this.channel = requireNonNull(channel);
         this.localOpen = requireNonNull(localOpen);
@@ -121,7 +120,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         }
 
         LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
-            remoteOpen.getSessionId());
+                remoteOpen.getSessionId());
         this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
     }
 
@@ -160,7 +159,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
      */
-    private  void handleKeepaliveTimer() {
+    private void handleKeepaliveTimer() {
         final long ct = TICKER.read();
 
         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
@@ -175,6 +174,16 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         }
     }
 
+    /**
+     * Handle exception occurred in the PCEP session. The session in error state should be closed
+     * properly so that it can be restored later.
+     */
+    @VisibleForTesting
+    void handleException(final Throwable cause) {
+        LOG.error("Exception captured for session {}, closing session.", this, cause);
+        terminate(TerminationReason.UNKNOWN);
+    }
+
     /**
      * Sends message to serialization.
      *
@@ -211,7 +220,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @VisibleForTesting
     public synchronized boolean isClosed() {
-        return this.closed;
+        return this.closed.get();
     }
 
     /**
@@ -228,12 +237,11 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * inside the session or from the listener, therefore the parent of this session should be informed.
      */
     @Override
-    public synchronized void close(final TerminationReason reason) {
-        if (this.closed) {
+    public void close(final TerminationReason reason) {
+        if (this.closed.getAndSet(true)) {
             LOG.debug("Session is already closed.");
             return;
         }
-        this.closed = true;
         // only send close message when the reason is provided
         if (reason != null) {
             LOG.info("Closing PCEP session with reason {}: {}", reason, this);
@@ -256,7 +264,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     private synchronized void terminate(final TerminationReason reason) {
-        if (this.closed) {
+        if (this.closed.get()) {
             LOG.debug("Session {} is already closed.", this);
             return;
         }
@@ -264,10 +272,9 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
     }
 
-    public synchronized void endOfInput() {
-        if (!this.closed) {
+    synchronized void endOfInput() {
+        if (!this.closed.getAndSet(true)) {
             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
-            this.closed = true;
         }
     }
 
@@ -294,13 +301,13 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param error documented error in RFC5440 or draft
      */
     @VisibleForTesting
-    public void handleMalformedMessage(final PCEPErrors error) {
+    void handleMalformedMessage(final PCEPErrors error) {
         final long ct = TICKER.read();
         this.sendErrorMessage(error);
         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
             this.unknownMessagesTimes.add(ct);
-            while ( ct - this.unknownMessagesTimes.peek() > MINUTE) {
-                this.unknownMessagesTimes.poll();
+            while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
+                final Long poll = this.unknownMessagesTimes.poll();
             }
             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
                 this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
@@ -315,7 +322,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param msg incoming message
      */
     public synchronized void handleMessage(final Message msg) {
-        if (this.closed) {
+        if (this.closed.get()) {
             LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
             return;
         }
@@ -337,7 +344,8 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
              * session DOWN event.
              */
             close();
-            this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason.forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
+            this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+                    .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
         } else {
             // This message needs to be handled by the user
             if (msg instanceof PcerrMessage) {
@@ -360,12 +368,17 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     @VisibleForTesting
-    public void sessionUp() {
-        this.listener.onSessionUp(this);
+    void sessionUp() {
+        try {
+            this.listener.onSessionUp(this);
+        } catch (final Exception e) {
+            handleException(e);
+            throw e;
+        }
     }
 
     @VisibleForTesting
-    protected final Queue<Long> getUnknownMessagesTimes() {
+    final Queue<Long> getUnknownMessagesTimes() {
         return this.unknownMessagesTimes;
     }
 
@@ -385,19 +398,14 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     @Override
-    public Class<? extends DataContainer> getImplementedInterface() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void resetStats() {
-        this.sessionState.reset();
+    public Open getLocalOpen() {
+        return this.sessionState.getLocalOpen();
     }
 
     @Override
-    public final void channelInactive(final ChannelHandlerContext ctx) {
+    public synchronized final void channelInactive(final ChannelHandlerContext ctx) {
         LOG.debug("Channel {} inactive.", ctx.channel());
-        this.endOfInput();
+        endOfInput();
 
         try {
             super.channelInactive(ctx);
@@ -407,18 +415,23 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     @Override
-    protected final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
+    protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
         LOG.debug("Message was received: {}", msg);
-        this.handleMessage(msg);
+        handleMessage(msg);
     }
 
     @Override
-    public final void handlerAdded(final ChannelHandlerContext ctx) {
+    public synchronized final void handlerAdded(final ChannelHandlerContext ctx) {
         this.sessionUp();
     }
 
     @Override
-    public Tlvs localSessionCharacteristics() {
+    public  synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        handleException(cause);
+    }
+
+    @Override
+    public Tlvs getLocalTlvs() {
         return this.localOpen.getTlvs();
     }