Add new revision for pcep types model
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
index 07bb6386afddf6a49866b8b33cae2492e43b6f99..db5c8b50b8f6417a9bcca86a788601a1da86a05f 100644 (file)
@@ -7,13 +7,17 @@
  */
 package org.opendaylight.protocol.pcep.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Ticker;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.util.concurrent.Future;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -22,24 +26,30 @@ import java.util.Date;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
 import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.TerminationReason;
+import org.opendaylight.protocol.pcep.impl.spi.Util;
 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,19 +57,20 @@ import org.slf4j.LoggerFactory;
  * Implementation of PCEPSession. (Not final for testing.)
  */
 @VisibleForTesting
-public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements PCEPSession, PCEPSessionRuntimeMXBean {
+public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implements PCEPSession {
+    private static final long MINUTE = TimeUnit.MINUTES.toNanos(1);
+    private static Ticker TICKER = Ticker.systemTicker();
     /**
      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
      */
-    @VisibleForTesting
-    protected volatile long lastMessageSentAt;
+    private volatile long lastMessageSentAt;
 
     /**
      * System.nanoTime value about when was received the last message
      */
     private long lastMessageReceivedAt;
 
-    private final Queue<Long> unknownMessagesTimes = new LinkedList<Long>();
+    private final Queue<Long> unknownMessagesTimes = new LinkedList<>();
 
     private final PCEPSessionListener listener;
 
@@ -75,26 +86,25 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
 
     private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
 
-    private int sentMsgCount = 0;
-
-    private int receivedMsgCount = 0;
-
     private int maxUnknownMessages;
 
     // True if the listener should not be notified about events
-    private boolean closed = false;
+    @GuardedBy("this")
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private final Channel channel;
 
     private final Keepalive kaMessage = new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
 
+    private final PCEPSessionState sessionState;
+
     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
-        final Open localOpen, final Open remoteOpen) {
-        this.listener = Preconditions.checkNotNull(listener);
-        this.channel = Preconditions.checkNotNull(channel);
-        this.localOpen = Preconditions.checkNotNull(localOpen);
-        this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
-        this.lastMessageReceivedAt = System.nanoTime();
+            final Open localOpen, final Open remoteOpen) {
+        this.listener = requireNonNull(listener);
+        this.channel = requireNonNull(channel);
+        this.localOpen = requireNonNull(localOpen);
+        this.remoteOpen = requireNonNull(remoteOpen);
+        this.lastMessageReceivedAt = TICKER.read();
 
         if (maxUnknownMessages != 0) {
             this.maxUnknownMessages = maxUnknownMessages;
@@ -102,25 +112,24 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
 
 
         if (getDeadTimerValue() != 0) {
-            channel.eventLoop().schedule(new Runnable() {
-                @Override
-                public void run() {
-                    handleDeadTimer();
-                }
-            }, getDeadTimerValue(), TimeUnit.SECONDS);
+            channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS);
         }
 
         if (getKeepAliveTimerValue() != 0) {
-            channel.eventLoop().schedule(new Runnable() {
-                @Override
-                public void run() {
-                    handleKeepaliveTimer();
-                }
-            }, getKeepAliveTimerValue(), TimeUnit.SECONDS);
+            channel.eventLoop().schedule(this::handleKeepaliveTimer, getKeepAliveTimerValue(), TimeUnit.SECONDS);
         }
 
-        LOG.info("Session {}[{}] <-> {}[{}] started", getLocalAddress(), localOpen.getSessionId(), getRemoteAddress(),
-            remoteOpen.getSessionId());
+        LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
+                remoteOpen.getSessionId());
+        this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
+    }
+
+    public final Integer getKeepAliveTimerValue() {
+        return this.localOpen.getKeepalive().intValue();
+    }
+
+    public final Integer getDeadTimerValue() {
+        return this.remoteOpen.getDeadTimer().intValue();
     }
 
     /**
@@ -130,21 +139,16 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
      * state will become IDLE), that rescheduling won't occur.
      */
     private synchronized void handleDeadTimer() {
-        final long ct = System.nanoTime();
+        final long ct = TICKER.read();
 
         final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
 
         if (this.channel.isActive()) {
             if (ct >= nextDead) {
                 LOG.debug("DeadTimer expired. {}", new Date());
-                this.terminate(TerminationReason.ExpDeadtimer);
+                this.terminate(TerminationReason.EXP_DEADTIMER);
             } else {
-                this.channel.eventLoop().schedule(new Runnable() {
-                    @Override
-                    public void run() {
-                        handleDeadTimer();
-                    }
-                }, nextDead - ct, TimeUnit.NANOSECONDS);
+                this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
             }
         }
     }
@@ -155,8 +159,8 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
      */
-    private  void handleKeepaliveTimer() {
-        final long ct = System.nanoTime();
+    private void handleKeepaliveTimer() {
+        final long ct = TICKER.read();
 
         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
 
@@ -166,15 +170,20 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
                 nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
             }
 
-            this.channel.eventLoop().schedule(new Runnable() {
-                @Override
-                public void run() {
-                    handleKeepaliveTimer();
-                }
-            }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
+            this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
         }
     }
 
+    /**
+     * Handle exception occurred in the PCEP session. The session in error state should be closed
+     * properly so that it can be restored later.
+     */
+    @VisibleForTesting
+    void handleException(final Throwable cause) {
+        LOG.error("Exception captured for session {}, closing session.", this, cause);
+        terminate(TerminationReason.UNKNOWN);
+    }
+
     /**
      * Sends message to serialization.
      *
@@ -183,33 +192,43 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
     @Override
     public Future<Void> sendMessage(final Message msg) {
         final ChannelFuture f = this.channel.writeAndFlush(msg);
-        this.lastMessageSentAt = System.nanoTime();
+        this.lastMessageSentAt = TICKER.read();
+        this.sessionState.updateLastSentMsg();
         if (!(msg instanceof KeepaliveMessage)) {
             LOG.debug("PCEP Message enqueued: {}", msg);
         }
-        this.sentMsgCount++;
-
-        f.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(final ChannelFuture arg) {
-                if (arg.isSuccess()) {
-                    LOG.trace("Message sent to socket: {}", msg);
-                } else {
-                    LOG.debug("Message not sent: {}", msg, arg.cause());
-                }
+        if (msg instanceof PcerrMessage) {
+            this.sessionState.setLastSentError(msg);
+        }
+
+        f.addListener((ChannelFutureListener) arg -> {
+            if (arg.isSuccess()) {
+                LOG.trace("Message sent to socket: {}", msg);
+            } else {
+                LOG.debug("Message not sent: {}", msg, arg.cause());
             }
         });
 
         return f;
     }
 
+    @VisibleForTesting
+    ChannelFuture closeChannel() {
+        LOG.info("Closing PCEP session channel: {}", this.channel);
+        return this.channel.close();
+    }
+
+    @VisibleForTesting
+    public synchronized boolean isClosed() {
+        return this.closed.get();
+    }
+
     /**
      * Closes PCEP session without sending a Close message, as the channel is no longer active.
      */
     @Override
-    public void close() {
-        LOG.info("Closing PCEP session: {}", this);
-        this.channel.close();
+    public synchronized void close() {
+        close(null);
     }
 
     /**
@@ -218,12 +237,20 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
      * inside the session or from the listener, therefore the parent of this session should be informed.
      */
     @Override
-    public synchronized void close(final TerminationReason reason) {
-        LOG.info("Closing PCEP session: {}", this);
-        this.closed = true;
-        this.sendMessage(new CloseBuilder().setCCloseMessage(
-            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
-        this.channel.close();
+    public void close(final TerminationReason reason) {
+        if (this.closed.getAndSet(true)) {
+            LOG.debug("Session is already closed.");
+            return;
+        }
+        // only send close message when the reason is provided
+        if (reason != null) {
+            LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+            sendMessage(new CloseBuilder().setCCloseMessage(
+                    new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+        } else {
+            LOG.info("Closing PCEP session: {}", this);
+        }
+        closeChannel();
     }
 
     @Override
@@ -233,33 +260,21 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
 
     @Override
     public InetAddress getRemoteAddress() {
-        if (this.channel.parent() != null) {
-            return ((InetSocketAddress) this.channel.localAddress()).getAddress();
-        }
         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
     }
 
-    private InetAddress getLocalAddress() {
-        if (this.channel.parent() != null) {
-            return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
-        }
-        return ((InetSocketAddress) this.channel.localAddress()).getAddress();
-    }
-
     private synchronized void terminate(final TerminationReason reason) {
-        LOG.info("Local PCEP session termination : {}", reason);
+        if (this.closed.get()) {
+            LOG.debug("Session {} is already closed.", this);
+            return;
+        }
+        close(reason);
         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
-        this.closed = true;
-        this.sendMessage(new CloseBuilder().setCCloseMessage(
-            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
-        this.close();
     }
 
-    @Override
-    public synchronized void endOfInput() {
-        if (!this.closed) {
+    synchronized void endOfInput() {
+        if (!this.closed.getAndSet(true)) {
             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
-            this.closed = true;
         }
     }
 
@@ -286,16 +301,16 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
      * @param error documented error in RFC5440 or draft
      */
     @VisibleForTesting
-    public void handleMalformedMessage(final PCEPErrors error) {
-        final long ct = System.nanoTime();
+    void handleMalformedMessage(final PCEPErrors error) {
+        final long ct = TICKER.read();
         this.sendErrorMessage(error);
         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
             this.unknownMessagesTimes.add(ct);
-            while (ct - this.unknownMessagesTimes.peek() > 60 * 1E9) {
-                this.unknownMessagesTimes.poll();
+            while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
+                final Long poll = this.unknownMessagesTimes.poll();
             }
             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
-                this.terminate(TerminationReason.TooManyUnknownMsg);
+                this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
             }
         }
     }
@@ -306,11 +321,14 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
      *
      * @param msg incoming message
      */
-    @Override
     public synchronized void handleMessage(final Message msg) {
+        if (this.closed.get()) {
+            LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
+            return;
+        }
         // Update last reception time
-        this.lastMessageReceivedAt = System.nanoTime();
-        this.receivedMsgCount++;
+        this.lastMessageReceivedAt = TICKER.read();
+        this.sessionState.updateLastReceivedMsg();
         if (!(msg instanceof KeepaliveMessage)) {
             LOG.debug("PCEP message {} received.", msg);
         }
@@ -325,77 +343,100 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
              * exception is CLOSE message, which needs to be converted into a
              * session DOWN event.
              */
-            this.close();
+            close();
+            this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+                    .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
         } else {
             // This message needs to be handled by the user
+            if (msg instanceof PcerrMessage) {
+                this.sessionState.setLastReceivedError(msg);
+            }
             this.listener.onMessage(this, msg);
         }
     }
 
-    /**
-     * @return the sentMsgCount
-     */
-
     @Override
-    public final Integer getSentMsgCount() {
-        return this.sentMsgCount;
+    public final String toString() {
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
     }
 
-    /**
-     * @return the receivedMsgCount
-     */
+    private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+        toStringHelper.add("channel", this.channel);
+        toStringHelper.add("localOpen", this.localOpen);
+        toStringHelper.add("remoteOpen", this.remoteOpen);
+        return toStringHelper;
+    }
+
+    @VisibleForTesting
+    void sessionUp() {
+        try {
+            this.listener.onSessionUp(this);
+        } catch (final Exception e) {
+            handleException(e);
+            throw e;
+        }
+    }
+
+    @VisibleForTesting
+    final Queue<Long> getUnknownMessagesTimes() {
+        return this.unknownMessagesTimes;
+    }
 
     @Override
-    public final Integer getReceivedMsgCount() {
-        return this.receivedMsgCount;
+    public Messages getMessages() {
+        return this.sessionState.getMessages(this.unknownMessagesTimes.size());
     }
 
     @Override
-    public final Integer getDeadTimerValue() {
-        return Integer.valueOf(this.remoteOpen.getDeadTimer());
+    public LocalPref getLocalPref() {
+        return this.sessionState.getLocalPref();
     }
 
     @Override
-    public final Integer getKeepAliveTimerValue() {
-        return Integer.valueOf(this.localOpen.getKeepalive());
+    public PeerPref getPeerPref() {
+        return this.sessionState.getPeerPref();
     }
 
     @Override
-    public final String getPeerAddress() {
-        final InetSocketAddress a = (InetSocketAddress) this.channel.remoteAddress();
-        return a.getHostName();
+    public Open getLocalOpen() {
+        return this.sessionState.getLocalOpen();
     }
 
     @Override
-    public void tearDown() {
-        this.close();
+    public synchronized final void channelInactive(final ChannelHandlerContext ctx) {
+        LOG.debug("Channel {} inactive.", ctx.channel());
+        endOfInput();
+
+        try {
+            super.channelInactive(ctx);
+        } catch (final Exception e) {
+            throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+        }
     }
 
     @Override
-    public final String toString() {
-        return addToStringAttributes(Objects.toStringHelper(this)).toString();
+    protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
+        LOG.debug("Message was received: {}", msg);
+        handleMessage(msg);
     }
 
-    protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        toStringHelper.add("channel", this.channel);
-        toStringHelper.add("localOpen", this.localOpen);
-        toStringHelper.add("remoteOpen", this.remoteOpen);
-        return toStringHelper;
+    @Override
+    public synchronized final void handlerAdded(final ChannelHandlerContext ctx) {
+        this.sessionUp();
     }
 
     @Override
-    @VisibleForTesting
-    public void sessionUp() {
-        this.listener.onSessionUp(this);
+    public  synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        handleException(cause);
     }
 
     @Override
-    public String getNodeIdentifier() {
-        return "";
+    public Tlvs getLocalTlvs() {
+        return this.localOpen.getTlvs();
     }
 
     @VisibleForTesting
-    protected final Queue<Long> getUnknownMessagesTimes() {
-        return this.unknownMessagesTimes;
+    static void setTicker(final Ticker ticker) {
+        TICKER = ticker;
     }
 }