From: Robert Varga Date: Fri, 22 Apr 2022 14:35:14 +0000 (+0200) Subject: Properly inject Ticker into PCEPSessionImpl X-Git-Tag: v0.16.15~23 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=9b2e32cef8616dfa5dd5e6260f0f1f694cb43855;p=bgpcep.git Properly inject Ticker into PCEPSessionImpl Rather than having a mutable static field with the potential pitfalls from that, use an explicit field to hold the associated ticker and inject it from tests. Also cleanup the class and do not leak ChannelFuture. The new field's overhead is eliminating by using a shared constant for the keepalive message. Change-Id: Id6f5cd263ea71b50e8a942c804a26ebb047edcc2 Signed-off-by: Robert Varga (cherry picked from commit 5f7ccf8acbfcfae2191f9713bf7594d65d11b16d) --- diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java index 436a855099..068678f7e7 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java @@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Ticker; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -58,8 +57,12 @@ import org.slf4j.LoggerFactory; */ @VisibleForTesting public class PCEPSessionImpl extends SimpleChannelInboundHandler implements PCEPSession { - private static final long MINUTE = TimeUnit.MINUTES.toNanos(1); - private static Ticker TICKER = Ticker.systemTicker(); + private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class); + private static final long MINUTE_NANOS = TimeUnit.MINUTES.toNanos(1); + private static final Keepalive KEEPALIVE = new KeepaliveBuilder() + .setKeepaliveMessage(new KeepaliveMessageBuilder().build()) + .build(); + /** * System.nanoTime value about when was sent the last message Protected to be updated also in tests. */ @@ -84,8 +87,6 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem */ private final Open remoteOpen; - private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class); - private int maxUnknownMessages; // True if the listener should not be notified about events @@ -94,24 +95,30 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem private final Channel channel; - private final Keepalive kaMessage = - new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build(); private final PCEPSessionState sessionState; + private final Ticker ticker; + PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel, final Open localOpen, final Open remoteOpen) { + this(listener, maxUnknownMessages, channel, localOpen, remoteOpen, Ticker.systemTicker()); + } + + @VisibleForTesting + PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel, + final Open localOpen, final Open remoteOpen, final Ticker ticker) { this.listener = requireNonNull(listener); this.channel = requireNonNull(channel); this.localOpen = requireNonNull(localOpen); this.remoteOpen = requireNonNull(remoteOpen); - this.lastMessageReceivedAt = TICKER.read(); + this.ticker = requireNonNull(ticker); + lastMessageReceivedAt = ticker.read(); if (maxUnknownMessages != 0) { this.maxUnknownMessages = maxUnknownMessages; } - if (getDeadTimerValue() != 0) { channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS); } @@ -122,15 +129,15 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(), remoteOpen.getSessionId()); - this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel); + sessionState = new PCEPSessionState(remoteOpen, localOpen, channel); } public final Integer getKeepAliveTimerValue() { - return this.localOpen.getKeepalive().intValue(); + return localOpen.getKeepalive().intValue(); } public final Integer getDeadTimerValue() { - return this.remoteOpen.getDeadTimer().intValue(); + return remoteOpen.getDeadTimer().intValue(); } /** @@ -140,16 +147,16 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem * state will become IDLE), that rescheduling won't occur. */ private synchronized void handleDeadTimer() { - final long ct = TICKER.read(); + final long ct = ticker.read(); - final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue()); + final long nextDead = lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue()); - if (this.channel.isActive()) { + if (channel.isActive()) { if (ct >= nextDead) { LOG.debug("DeadTimer expired. {}", new Date()); - this.terminate(TerminationReason.EXP_DEADTIMER); + terminate(TerminationReason.EXP_DEADTIMER); } else { - this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS); + channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS); } } } @@ -161,17 +168,17 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem * starts to execute (the session state will become IDLE), that rescheduling won't occur. */ private void handleKeepaliveTimer() { - final long ct = TICKER.read(); + final long ct = ticker.read(); - long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); + long nextKeepalive = lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); - if (this.channel.isActive()) { + if (channel.isActive()) { if (ct >= nextKeepalive) { - this.sendMessage(this.kaMessage); - nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); + sendMessage(KEEPALIVE); + nextKeepalive = lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); } - this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS); + channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS); } } @@ -192,14 +199,14 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem */ @Override public Future sendMessage(final Message msg) { - final ChannelFuture f = this.channel.writeAndFlush(msg); - this.lastMessageSentAt = TICKER.read(); - this.sessionState.updateLastSentMsg(); + final ChannelFuture f = channel.writeAndFlush(msg); + lastMessageSentAt = ticker.read(); + sessionState.updateLastSentMsg(); if (!(msg instanceof KeepaliveMessage)) { LOG.debug("PCEP Message enqueued: {}", msg); } if (msg instanceof PcerrMessage) { - this.sessionState.setLastSentError(msg); + sessionState.setLastSentError(msg); } f.addListener((ChannelFutureListener) arg -> { @@ -214,14 +221,14 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem } @VisibleForTesting - ChannelFuture closeChannel() { - LOG.info("Closing PCEP session channel: {}", this.channel); - return this.channel.close(); + Future closeChannel() { + LOG.info("Closing PCEP session channel: {}", channel); + return channel.close(); } @VisibleForTesting public synchronized boolean isClosed() { - return this.closed.get(); + return closed.get(); } /** @@ -239,7 +246,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem */ @Override public void close(final TerminationReason reason) { - if (this.closed.getAndSet(true)) { + if (closed.getAndSet(true)) { LOG.debug("Session is already closed."); return; } @@ -257,26 +264,26 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem @Override public Tlvs getRemoteTlvs() { - return this.remoteOpen.getTlvs(); + return remoteOpen.getTlvs(); } @Override public InetAddress getRemoteAddress() { - return ((InetSocketAddress) this.channel.remoteAddress()).getAddress(); + return ((InetSocketAddress) channel.remoteAddress()).getAddress(); } private synchronized void terminate(final TerminationReason reason) { - if (this.closed.get()) { + if (closed.get()) { LOG.debug("Session {} is already closed.", this); return; } close(reason); - this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason)); + listener.onSessionTerminated(this, new PCEPCloseTermination(reason)); } synchronized void endOfInput() { - if (!this.closed.getAndSet(true)) { - this.listener.onSessionDown(this, new IOException("End of input detected. Close the session.")); + if (!closed.getAndSet(true)) { + listener.onSessionDown(this, new IOException("End of input detected. Close the session.")); } } @@ -291,7 +298,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem * @param open Open Object */ private void sendErrorMessage(final PCEPErrors value, final Open open) { - this.sendMessage(Util.createErrorMessage(value, open)); + sendMessage(Util.createErrorMessage(value, open)); } /** @@ -304,15 +311,15 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem */ @VisibleForTesting void handleMalformedMessage(final PCEPErrors error) { - final long ct = TICKER.read(); + final long ct = ticker.read(); this.sendErrorMessage(error); if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) { - this.unknownMessagesTimes.add(ct); - while (ct - this.unknownMessagesTimes.peek() > MINUTE) { - this.unknownMessagesTimes.remove(); + unknownMessagesTimes.add(ct); + while (ct - unknownMessagesTimes.peek() > MINUTE_NANOS) { + unknownMessagesTimes.remove(); } - if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) { - this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS); + if (unknownMessagesTimes.size() > maxUnknownMessages) { + terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS); } } } @@ -324,13 +331,13 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem * @param msg incoming message */ public synchronized void handleMessage(final Message msg) { - if (this.closed.get()) { + if (closed.get()) { LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg); return; } // Update last reception time - this.lastMessageReceivedAt = TICKER.read(); - this.sessionState.updateLastReceivedMsg(); + lastMessageReceivedAt = ticker.read(); + sessionState.updateLastReceivedMsg(); if (!(msg instanceof KeepaliveMessage)) { LOG.debug("PCEP message {} received.", msg); } @@ -346,34 +353,31 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem * session DOWN event. */ close(); - this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason + listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason()))); } else { // This message needs to be handled by the user if (msg instanceof PcerrMessage) { - this.sessionState.setLastReceivedError(msg); + sessionState.setLastReceivedError(msg); } - this.listener.onMessage(this, msg); + listener.onMessage(this, msg); } } @Override public final String toString() { - return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); - } - - private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { - toStringHelper.add("channel", this.channel); - toStringHelper.add("localOpen", this.localOpen); - toStringHelper.add("remoteOpen", this.remoteOpen); - return toStringHelper; + return MoreObjects.toStringHelper(this) + .add("channel", channel) + .add("localOpen", localOpen) + .add("remoteOpen", remoteOpen) + .toString(); } @VisibleForTesting @SuppressWarnings("checkstyle:IllegalCatch") void sessionUp() { try { - this.listener.onSessionUp(this); + listener.onSessionUp(this); } catch (final RuntimeException e) { handleException(e); throw e; @@ -382,27 +386,27 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem @VisibleForTesting final Queue getUnknownMessagesTimes() { - return this.unknownMessagesTimes; + return unknownMessagesTimes; } @Override public Messages getMessages() { - return this.sessionState.getMessages(this.unknownMessagesTimes.size()); + return sessionState.getMessages(unknownMessagesTimes.size()); } @Override public LocalPref getLocalPref() { - return this.sessionState.getLocalPref(); + return sessionState.getLocalPref(); } @Override public PeerPref getPeerPref() { - return this.sessionState.getPeerPref(); + return sessionState.getPeerPref(); } @Override public Open getLocalOpen() { - return this.sessionState.getLocalOpen(); + return sessionState.getLocalOpen(); } @Override @@ -421,7 +425,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem @Override public final synchronized void handlerAdded(final ChannelHandlerContext ctx) { - this.sessionUp(); + sessionUp(); } @Override @@ -431,11 +435,6 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler implem @Override public Tlvs getLocalTlvs() { - return this.localOpen.getTlvs(); - } - - @VisibleForTesting - static void setTicker(final Ticker ticker) { - TICKER = ticker; + return localOpen.getTlvs(); } } diff --git a/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java b/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java index b06570a8af..f6ec1f2998 100644 --- a/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java +++ b/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java @@ -9,17 +9,24 @@ package org.opendaylight.protocol.pcep.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.opendaylight.protocol.util.CheckTestUtil.checkEquals; import com.google.common.base.Ticker; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.GlobalEventExecutor; -import java.util.Queue; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.opendaylight.protocol.pcep.PCEPSessionListener; +import org.opendaylight.protocol.pcep.PCEPTerminationReason; import org.opendaylight.protocol.pcep.impl.spi.Util; import org.opendaylight.protocol.pcep.spi.PCEPErrors; -import org.opendaylight.protocol.util.CheckTestUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.app.config.rev160707.pcep.dispatcher.config.TlsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Open; @@ -32,10 +39,8 @@ import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.Uint8; public class FiniteStateMachineTest extends AbstractPCEPSessionTest { - private DefaultPCEPSessionNegotiator serverSession; private DefaultPCEPSessionNegotiator tlsSessionNegotiator; - private final TestTicker ticker = new TestTicker(); @Before public void setup() { @@ -206,53 +211,44 @@ public class FiniteStateMachineTest extends AbstractPCEPSessionTest { } @Test - public void testUnknownMessage() throws Exception { - final SimpleSessionListener client = new SimpleSessionListener(); - final PCEPSessionImpl session = new PCEPSessionImpl(client, 5, this.channel, - this.openMsg.getOpenMessage().getOpen(), this.openMsg.getOpenMessage().getOpen()); - PCEPSessionImpl.setTicker(this.ticker); - session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - final Queue qeue = session.getUnknownMessagesTimes(); - CheckTestUtil.checkEquals(() -> assertEquals(1, qeue.size())); + public void testUnknownMessage() { + final Ticker ticker = mock(Ticker.class); + doReturn( + // session create + 0L, + // first four receive/send + 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, + // a minute has passed since second receive/send + 60000000004L, 60000000005L, + // a minute has passed since third receive/send and the the time gets stuck :) + 60000000006L, 60000000007L).when(ticker).read(); + + final var listener = mock(PCEPSessionListener.class); + final var session = new PCEPSessionImpl(listener, 5, channel, openMsg.getOpenMessage().getOpen(), + openMsg.getOpenMessage().getOpen(), ticker); + + final var qeue = session.getUnknownMessagesTimes(); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - CheckTestUtil.checkEquals(() -> assertEquals(2, qeue.size())); + assertEquals(1, qeue.size()); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size())); + assertEquals(2, qeue.size()); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - CheckTestUtil.checkEquals(() -> assertEquals(4, qeue.size())); + assertEquals(3, qeue.size()); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size())); + assertEquals(4, qeue.size()); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size())); + assertEquals(3, qeue.size()); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - CheckTestUtil.checkEquals(() -> assertEquals(4, qeue.size())); + assertEquals(3, qeue.size()); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - CheckTestUtil.checkEquals(() -> assertEquals(5, qeue.size())); + assertEquals(4, qeue.size()); session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); - synchronized (client) { - while (client.up) { - client.wait(); - } - } - CheckTestUtil.checkEquals(() -> assertTrue(!client.up)); - } + assertEquals(5, qeue.size()); - private final class TestTicker extends Ticker { - private long counter = 0L; - - TestTicker() { - } - - @Override - public long read() { - if (this.counter == 8) { - this.counter++; - return 60000000003L; - } else if (this.counter == 10) { - this.counter++; - return 60000000006L; - } - return this.counter++; - } + final var captor = ArgumentCaptor.forClass(PCEPTerminationReason.class); + doNothing().when(listener).onSessionTerminated(eq(session), captor.capture()); + session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED); + verify(ticker, times(20)).read(); + assertEquals("TOO_MANY_UNKNOWN_MSGS", captor.getValue().getErrorMessage()); } }