Properly inject Ticker into PCEPSessionImpl 21/100721/4
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 22 Apr 2022 14:35:14 +0000 (16:35 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 22 Apr 2022 16:23:17 +0000 (18:23 +0200)
Rather than having a mutable static field with the potential pitfalls
from that, use an explicit field to hold the associated ticker and
inject it from tests. Also cleanup the class and do not leak
ChannelFuture. The new field's overhead is eliminating by using a shared
constant for the keepalive message.

Change-Id: Id6f5cd263ea71b50e8a942c804a26ebb047edcc2
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java

index 436a855099aa588e47265b116d3c0e768d3baf20..068678f7e771c9bd27ed88abe0d203993fb924ae 100644 (file)
@@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Ticker;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -58,8 +57,12 @@ import org.slf4j.LoggerFactory;
  */
 @VisibleForTesting
 public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implements PCEPSession {
-    private static final long MINUTE = TimeUnit.MINUTES.toNanos(1);
-    private static Ticker TICKER = Ticker.systemTicker();
+    private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
+    private static final long MINUTE_NANOS = TimeUnit.MINUTES.toNanos(1);
+    private static final Keepalive KEEPALIVE = new KeepaliveBuilder()
+        .setKeepaliveMessage(new KeepaliveMessageBuilder().build())
+        .build();
+
     /**
      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
      */
@@ -84,8 +87,6 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      */
     private final Open remoteOpen;
 
-    private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
-
     private int maxUnknownMessages;
 
     // True if the listener should not be notified about events
@@ -94,24 +95,30 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     private final Channel channel;
 
-    private final Keepalive kaMessage =
-        new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
 
     private final PCEPSessionState sessionState;
 
+    private final Ticker ticker;
+
     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
             final Open localOpen, final Open remoteOpen) {
+        this(listener, maxUnknownMessages, channel, localOpen, remoteOpen, Ticker.systemTicker());
+    }
+
+    @VisibleForTesting
+    PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
+            final Open localOpen, final Open remoteOpen, final Ticker ticker) {
         this.listener = requireNonNull(listener);
         this.channel = requireNonNull(channel);
         this.localOpen = requireNonNull(localOpen);
         this.remoteOpen = requireNonNull(remoteOpen);
-        this.lastMessageReceivedAt = TICKER.read();
+        this.ticker = requireNonNull(ticker);
+        lastMessageReceivedAt = ticker.read();
 
         if (maxUnknownMessages != 0) {
             this.maxUnknownMessages = maxUnknownMessages;
         }
 
-
         if (getDeadTimerValue() != 0) {
             channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS);
         }
@@ -122,15 +129,15 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
         LOG.info("Session {}[{}] <-> {}[{}] started",
             channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(), remoteOpen.getSessionId());
-        this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
+        sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
     }
 
     public final Integer getKeepAliveTimerValue() {
-        return this.localOpen.getKeepalive().intValue();
+        return localOpen.getKeepalive().intValue();
     }
 
     public final Integer getDeadTimerValue() {
-        return this.remoteOpen.getDeadTimer().intValue();
+        return remoteOpen.getDeadTimer().intValue();
     }
 
     /**
@@ -140,16 +147,16 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * state will become IDLE), that rescheduling won't occur.
      */
     private synchronized void handleDeadTimer() {
-        final long ct = TICKER.read();
+        final long ct = ticker.read();
 
-        final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
+        final long nextDead = lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
 
-        if (this.channel.isActive()) {
+        if (channel.isActive()) {
             if (ct >= nextDead) {
                 LOG.debug("DeadTimer expired. {}", new Date());
-                this.terminate(TerminationReason.EXP_DEADTIMER);
+                terminate(TerminationReason.EXP_DEADTIMER);
             } else {
-                this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
+                channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
             }
         }
     }
@@ -161,17 +168,17 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
      */
     private void handleKeepaliveTimer() {
-        final long ct = TICKER.read();
+        final long ct = ticker.read();
 
-        long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
+        long nextKeepalive = lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
 
-        if (this.channel.isActive()) {
+        if (channel.isActive()) {
             if (ct >= nextKeepalive) {
-                this.sendMessage(this.kaMessage);
-                nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
+                sendMessage(KEEPALIVE);
+                nextKeepalive = lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
             }
 
-            this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
+            channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
         }
     }
 
@@ -192,14 +199,14 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      */
     @Override
     public Future<Void> sendMessage(final Message msg) {
-        final ChannelFuture f = this.channel.writeAndFlush(msg);
-        this.lastMessageSentAt = TICKER.read();
-        this.sessionState.updateLastSentMsg();
+        final ChannelFuture f = channel.writeAndFlush(msg);
+        lastMessageSentAt = ticker.read();
+        sessionState.updateLastSentMsg();
         if (!(msg instanceof KeepaliveMessage)) {
             LOG.debug("PCEP Message enqueued: {}", msg);
         }
         if (msg instanceof PcerrMessage) {
-            this.sessionState.setLastSentError(msg);
+            sessionState.setLastSentError(msg);
         }
 
         f.addListener((ChannelFutureListener) arg -> {
@@ -214,14 +221,14 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     }
 
     @VisibleForTesting
-    ChannelFuture closeChannel() {
-        LOG.info("Closing PCEP session channel: {}", this.channel);
-        return this.channel.close();
+    Future<Void> closeChannel() {
+        LOG.info("Closing PCEP session channel: {}", channel);
+        return channel.close();
     }
 
     @VisibleForTesting
     public synchronized boolean isClosed() {
-        return this.closed.get();
+        return closed.get();
     }
 
     /**
@@ -239,7 +246,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      */
     @Override
     public void close(final TerminationReason reason) {
-        if (this.closed.getAndSet(true)) {
+        if (closed.getAndSet(true)) {
             LOG.debug("Session is already closed.");
             return;
         }
@@ -257,26 +264,26 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @Override
     public Tlvs getRemoteTlvs() {
-        return this.remoteOpen.getTlvs();
+        return remoteOpen.getTlvs();
     }
 
     @Override
     public InetAddress getRemoteAddress() {
-        return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
+        return ((InetSocketAddress) channel.remoteAddress()).getAddress();
     }
 
     private synchronized void terminate(final TerminationReason reason) {
-        if (this.closed.get()) {
+        if (closed.get()) {
             LOG.debug("Session {} is already closed.", this);
             return;
         }
         close(reason);
-        this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
+        listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
     }
 
     synchronized void endOfInput() {
-        if (!this.closed.getAndSet(true)) {
-            this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
+        if (!closed.getAndSet(true)) {
+            listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
         }
     }
 
@@ -291,7 +298,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param open Open Object
      */
     private void sendErrorMessage(final PCEPErrors value, final Open open) {
-        this.sendMessage(Util.createErrorMessage(value, open));
+        sendMessage(Util.createErrorMessage(value, open));
     }
 
     /**
@@ -304,15 +311,15 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      */
     @VisibleForTesting
     void handleMalformedMessage(final PCEPErrors error) {
-        final long ct = TICKER.read();
+        final long ct = ticker.read();
         this.sendErrorMessage(error);
         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
-            this.unknownMessagesTimes.add(ct);
-            while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
-                this.unknownMessagesTimes.remove();
+            unknownMessagesTimes.add(ct);
+            while (ct - unknownMessagesTimes.peek() > MINUTE_NANOS) {
+                unknownMessagesTimes.remove();
             }
-            if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
-                this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
+            if (unknownMessagesTimes.size() > maxUnknownMessages) {
+                terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
             }
         }
     }
@@ -324,13 +331,13 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param msg incoming message
      */
     public synchronized void handleMessage(final Message msg) {
-        if (this.closed.get()) {
+        if (closed.get()) {
             LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
             return;
         }
         // Update last reception time
-        this.lastMessageReceivedAt = TICKER.read();
-        this.sessionState.updateLastReceivedMsg();
+        lastMessageReceivedAt = ticker.read();
+        sessionState.updateLastReceivedMsg();
         if (!(msg instanceof KeepaliveMessage)) {
             LOG.debug("PCEP message {} received.", msg);
         }
@@ -346,34 +353,31 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
              * session DOWN event.
              */
             close();
-            this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+            listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
                     .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
         } else {
             // This message needs to be handled by the user
             if (msg instanceof PcerrMessage) {
-                this.sessionState.setLastReceivedError(msg);
+                sessionState.setLastReceivedError(msg);
             }
-            this.listener.onMessage(this, msg);
+            listener.onMessage(this, msg);
         }
     }
 
     @Override
     public final String toString() {
-        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
-    }
-
-    private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        toStringHelper.add("channel", this.channel);
-        toStringHelper.add("localOpen", this.localOpen);
-        toStringHelper.add("remoteOpen", this.remoteOpen);
-        return toStringHelper;
+        return MoreObjects.toStringHelper(this)
+            .add("channel", channel)
+            .add("localOpen", localOpen)
+            .add("remoteOpen", remoteOpen)
+            .toString();
     }
 
     @VisibleForTesting
     @SuppressWarnings("checkstyle:IllegalCatch")
     void sessionUp() {
         try {
-            this.listener.onSessionUp(this);
+            listener.onSessionUp(this);
         } catch (final RuntimeException e) {
             handleException(e);
             throw e;
@@ -382,27 +386,27 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @VisibleForTesting
     final Queue<Long> getUnknownMessagesTimes() {
-        return this.unknownMessagesTimes;
+        return unknownMessagesTimes;
     }
 
     @Override
     public Messages getMessages() {
-        return this.sessionState.getMessages(this.unknownMessagesTimes.size());
+        return sessionState.getMessages(unknownMessagesTimes.size());
     }
 
     @Override
     public LocalPref getLocalPref() {
-        return this.sessionState.getLocalPref();
+        return sessionState.getLocalPref();
     }
 
     @Override
     public PeerPref getPeerPref() {
-        return this.sessionState.getPeerPref();
+        return sessionState.getPeerPref();
     }
 
     @Override
     public Open getLocalOpen() {
-        return this.sessionState.getLocalOpen();
+        return sessionState.getLocalOpen();
     }
 
     @Override
@@ -421,7 +425,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @Override
     public final synchronized void handlerAdded(final ChannelHandlerContext ctx) {
-        this.sessionUp();
+        sessionUp();
     }
 
     @Override
@@ -431,11 +435,6 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @Override
     public Tlvs getLocalTlvs() {
-        return this.localOpen.getTlvs();
-    }
-
-    @VisibleForTesting
-    static void setTicker(final Ticker ticker) {
-        TICKER = ticker;
+        return localOpen.getTlvs();
     }
 }
index e58c67a8138b592e230c22509ef4c1d1bae6169c..9858d20de75ea0ff010536332a0096ce91c0312b 100644 (file)
@@ -9,17 +9,24 @@ package org.opendaylight.protocol.pcep.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.opendaylight.protocol.util.CheckTestUtil.checkEquals;
 
 import com.google.common.base.Ticker;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
-import java.util.Queue;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.protocol.pcep.PCEPSessionListener;
+import org.opendaylight.protocol.pcep.PCEPTerminationReason;
 import org.opendaylight.protocol.pcep.impl.spi.Util;
 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.protocol.util.CheckTestUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.app.config.rev160707.pcep.dispatcher.config.TlsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Open;
@@ -32,10 +39,8 @@ import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.Uint8;
 
 public class FiniteStateMachineTest extends AbstractPCEPSessionTest {
-
     private DefaultPCEPSessionNegotiator serverSession;
     private DefaultPCEPSessionNegotiator tlsSessionNegotiator;
-    private final TestTicker ticker = new TestTicker();
 
     @Before
     public void setup() {
@@ -206,53 +211,44 @@ public class FiniteStateMachineTest extends AbstractPCEPSessionTest {
     }
 
     @Test
-    public void testUnknownMessage() throws Exception {
-        final SimpleSessionListener client = new SimpleSessionListener();
-        final PCEPSessionImpl session = new PCEPSessionImpl(client, 5, channel,
-            openMsg.getOpenMessage().getOpen(), openMsg.getOpenMessage().getOpen());
-        PCEPSessionImpl.setTicker(ticker);
-        session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        final Queue<Long> qeue = session.getUnknownMessagesTimes();
-        CheckTestUtil.checkEquals(() -> assertEquals(1, qeue.size()));
+    public void testUnknownMessage() {
+        final Ticker ticker = mock(Ticker.class);
+        doReturn(
+            // session create
+            0L,
+            // first four receive/send
+            1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L,
+            // a minute has passed since second receive/send
+            60000000004L, 60000000005L,
+            // a minute has passed since third receive/send and the the time gets stuck :)
+            60000000006L, 60000000007L).when(ticker).read();
+
+        final var listener = mock(PCEPSessionListener.class);
+        final var session = new PCEPSessionImpl(listener, 5, channel, openMsg.getOpenMessage().getOpen(),
+            openMsg.getOpenMessage().getOpen(), ticker);
+
+        final var qeue = session.getUnknownMessagesTimes();
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        CheckTestUtil.checkEquals(() -> assertEquals(2, qeue.size()));
+        assertEquals(1, qeue.size());
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size()));
+        assertEquals(2, qeue.size());
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        CheckTestUtil.checkEquals(() -> assertEquals(4, qeue.size()));
+        assertEquals(3, qeue.size());
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size()));
+        assertEquals(4, qeue.size());
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size()));
+        assertEquals(3, qeue.size());
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        CheckTestUtil.checkEquals(() -> assertEquals(4, qeue.size()));
+        assertEquals(3, qeue.size());
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        CheckTestUtil.checkEquals(() -> assertEquals(5, qeue.size()));
+        assertEquals(4, qeue.size());
         session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
-        synchronized (client) {
-            while (client.up) {
-                client.wait();
-            }
-        }
-        CheckTestUtil.checkEquals(() -> assertTrue(!client.up));
-    }
+        assertEquals(5, qeue.size());
 
-    private static final class TestTicker extends Ticker {
-        private long counter = 0L;
-
-        TestTicker() {
-        }
-
-        @Override
-        public long read() {
-            if (counter == 8) {
-                counter++;
-                return 60000000003L;
-            } else if (counter == 10) {
-                counter++;
-                return 60000000006L;
-            }
-            return counter++;
-        }
+        final var captor = ArgumentCaptor.forClass(PCEPTerminationReason.class);
+        doNothing().when(listener).onSessionTerminated(eq(session), captor.capture());
+        session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
+        verify(ticker, times(20)).read();
+        assertEquals("TOO_MANY_UNKNOWN_MSGS", captor.getValue().getErrorMessage());
     }
 }