Add TransmitQueue unit tests 74/53074/5
authorAndrej Mak <andrej.mak@pantheon.tech>
Thu, 9 Mar 2017 12:29:02 +0000 (13:29 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 13 Mar 2017 12:54:10 +0000 (12:54 +0000)
Change-Id: Id7067db80a3ba4f8befec1b8bccf764db2419eb7
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryMatcher.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/HaltedTransmitQueueTest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java [new file with mode: 0644]

index 14ddf047d427ed471793be95f64ea65ea1cba20b..550dd9fa518b35b2c94d28f3ed0054863d5ab30c 100644 (file)
@@ -14,6 +14,7 @@ import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -21,8 +22,6 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
 import java.util.Optional;
 import java.util.function.Consumer;
 import akka.testkit.TestProbe;
 import java.util.Optional;
 import java.util.function.Consumer;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -164,37 +163,4 @@ public abstract class AbstractClientConnectionTest<T extends AbstractClientConne
         return new AbortLocalTransactionRequest(identifier, replyTo);
     }
 
         return new AbortLocalTransactionRequest(identifier, replyTo);
     }
 
-    private static ConnectionEntryMatcher entryWithRequest(final Request<?, ?> request) {
-        return new ConnectionEntryMatcher(request);
-    }
-
-    private static class ConnectionEntryMatcher extends BaseMatcher<ConnectionEntry> {
-
-        private final Request request;
-
-        private ConnectionEntryMatcher(final Request request) {
-            this.request = request;
-        }
-
-        @Override
-        public boolean matches(final Object item) {
-            if (!(item instanceof ConnectionEntry)) {
-                return false;
-            }
-            final ConnectionEntry entry = (ConnectionEntry) item;
-            return this.request.equals(entry.getRequest());
-        }
-
-        @Override
-        public void describeMismatch(final Object item, final Description description) {
-            final ConnectionEntry entry = (ConnectionEntry) item;
-            super.describeMismatch(entry.getRequest(), description);
-        }
-
-        @Override
-        public void describeTo(final Description description) {
-            description.appendValue(request);
-        }
-    }
-
-}
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java
new file mode 100644 (file)
index 0000000..8e07804
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Iterables;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
+
+    private static final FrontendIdentifier FRONTEND =
+            FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("type-1"));
+    private static final ClientIdentifier CLIENT = ClientIdentifier.create(FRONTEND, 0);
+    protected static final LocalHistoryIdentifier HISTORY = new LocalHistoryIdentifier(CLIENT, 0);
+    protected static final TransactionIdentifier TRANSACTION_IDENTIFIER = new TransactionIdentifier(HISTORY, 0);
+    protected T queue;
+    protected ActorSystem system;
+    protected TestProbe probe;
+
+    protected abstract int getMaxInFlightMessages();
+
+    protected abstract T createQueue();
+
+    @Before
+    public void setUp() throws Exception {
+        system = ActorSystem.apply();
+        probe = new TestProbe(system);
+        queue = createQueue();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system);
+    }
+
+    @Test
+    public abstract void testCanTransmitCount() throws Exception;
+
+    @Test(expected = UnsupportedOperationException.class)
+    public abstract void testTransmit() throws Exception;
+
+    @Test
+    public void testAsIterable() throws Exception {
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        final int sentMessages = getMaxInFlightMessages() + 1;
+        for (int i = 0; i < sentMessages; i++) {
+            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        }
+        final Iterable<ConnectionEntry> entries = queue.asIterable();
+        Assert.assertEquals(sentMessages, Iterables.size(entries));
+        Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+    }
+
+    @Test
+    public void testTicksStalling() throws Exception {
+        final long now = Ticker.systemTicker().read();
+        Assert.assertEquals(0, queue.ticksStalling(now));
+    }
+
+    @Test
+    public void testCompleteReponseNotMatchingRequest() throws Exception {
+        final long requestSequence = 0L;
+        final long txSequence = 0L;
+        final long sessionId = 0L;
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        //different transaction id
+        final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
+        final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
+        final Optional<TransmittedConnectionEntry> completed1 =
+                queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
+        Assert.assertFalse(completed1.isPresent());
+        //different response sequence
+        final long differentResponseSequence = 1L;
+        final RequestSuccess<?, ?> success2 =
+                new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
+        final Optional<TransmittedConnectionEntry> completed2 =
+                queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
+        Assert.assertFalse(completed2.isPresent());
+        //different tx sequence
+        final long differentTxSequence = 1L;
+        final RequestSuccess<?, ?> success3 =
+                new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
+        final Optional<TransmittedConnectionEntry> completed3 =
+                queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
+        Assert.assertFalse(completed3.isPresent());
+        //different session id
+        final long differentSessionId = 1L;
+        final RequestSuccess<?, ?> success4 =
+                new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
+        final Optional<TransmittedConnectionEntry> completed4 =
+                queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
+        Assert.assertFalse(completed4.isPresent());
+    }
+
+    @Test
+    public void testIsEmpty() throws Exception {
+        Assert.assertTrue(queue.isEmpty());
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        Assert.assertFalse(queue.isEmpty());
+    }
+
+    @Test
+    public void testPeek() throws Exception {
+        final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
+        final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
+        queue.enqueue(entry1, now);
+        queue.enqueue(entry2, now);
+        Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
+    }
+
+    @Test
+    public void testPoison() throws Exception {
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
+        verify(callback).accept(any(TransactionFailure.class));
+        Assert.assertTrue(queue.isEmpty());
+    }
+
+    @SuppressWarnings("unchecked")
+    protected static Consumer<Response<?, ?>> createConsumerMock() {
+        return mock(Consumer.class);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryMatcher.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryMatcher.java
new file mode 100644 (file)
index 0000000..d2d2fd8
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+
+/**
+ * Matcher checks, whether matched {@link ConnectionEntry} tracks provided {@link Request}.
+ */
+class ConnectionEntryMatcher extends BaseMatcher<ConnectionEntry> {
+
+    private final Request request;
+
+    /**
+     * Creates a matcher that matches if the examined {@link ConnectionEntry} contains specified request.
+     *
+     * @param request request
+     * @return matcher
+     */
+    public static ConnectionEntryMatcher entryWithRequest(final Request<?, ?> request) {
+        return new ConnectionEntryMatcher(request);
+    }
+
+    private ConnectionEntryMatcher(final Request request) {
+        this.request = request;
+    }
+
+    @Override
+    public boolean matches(final Object item) {
+        if (!(item instanceof ConnectionEntry)) {
+            return false;
+        }
+        final ConnectionEntry entry = (ConnectionEntry) item;
+        return this.request.equals(entry.getRequest());
+    }
+
+    @Override
+    public void describeMismatch(final Object item, final Description description) {
+        final ConnectionEntry entry = (ConnectionEntry) item;
+        super.describeMismatch(entry.getRequest(), description);
+    }
+
+    @Override
+    public void describeTo(final Description description) {
+        description.appendValue(request);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/HaltedTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/HaltedTransmitQueueTest.java
new file mode 100644 (file)
index 0000000..8039400
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.base.Ticker;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+
+public class HaltedTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Halted> {
+
+    @Override
+    protected int getMaxInFlightMessages() {
+        return 0;
+    }
+
+    @Override
+    protected TransmitQueue.Halted createQueue() {
+        return new TransmitQueue.Halted(0);
+    }
+
+    @Test
+    @Override
+    public void testCanTransmitCount() throws Exception {
+        Assert.assertFalse(queue.canTransmitCount(0) > 0);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    @Override
+    public void testTransmit() throws Exception {
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
+        queue.transmit(entry, now);
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java
new file mode 100644 (file)
index 0000000..4859591
--- /dev/null
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
+
+import com.google.common.base.Ticker;
+import com.google.common.collect.Iterables;
+import com.google.common.testing.FakeTicker;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
+
+    private BackendInfo backendInfo;
+
+    @Override
+    protected int getMaxInFlightMessages() {
+        return backendInfo.getMaxMessages();
+    }
+
+    @Override
+    protected TransmitQueue.Transmitting createQueue() {
+        backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
+        return new TransmitQueue.Transmitting(0, backendInfo);
+    }
+
+    @Test
+    public void testComplete() throws Exception {
+        final long sequence1 = 0L;
+        final long sequence2 = 1L;
+        final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
+        final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
+        final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
+        final Consumer<Response<?, ?>> callback1 = createConsumerMock();
+        final Consumer<Response<?, ?>> callback2 = createConsumerMock();
+        final long now1 = Ticker.systemTicker().read();
+        final long now2 = Ticker.systemTicker().read();
+        //enqueue 2 entries
+        queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
+        queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+        final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
+        final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
+        //complete entries in different order
+        final Optional<TransmittedConnectionEntry> completed2 =
+                queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
+        final Optional<TransmittedConnectionEntry> completed1 =
+                queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
+        //check first entry
+        final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
+        Assert.assertEquals(transmittedEntry1.getRequest(), request1);
+        Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1);
+        Assert.assertEquals(transmittedEntry1.getCallback(), callback1);
+        //check second entry
+        final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
+        Assert.assertEquals(transmittedEntry2.getRequest(), request2);
+        Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2);
+        Assert.assertEquals(transmittedEntry2.getCallback(), callback2);
+    }
+
+    @Test
+    public void testEnqueueCanTransmit() throws Exception {
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+        Assert.assertEquals(request, requestEnvelope.getMessage());
+    }
+
+    @Test
+    public void testEnqueueBackendFull() throws Exception {
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        final int sentMessages = getMaxInFlightMessages() + 1;
+        for (int i = 0; i < sentMessages; i++) {
+            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        }
+        for (int i = 0; i < getMaxInFlightMessages(); i++) {
+            probe.expectMsgClass(RequestEnvelope.class);
+        }
+        probe.expectNoMsg();
+        final Iterable<ConnectionEntry> entries = queue.asIterable();
+        Assert.assertEquals(sentMessages, Iterables.size(entries));
+        Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+    }
+
+    @Test
+    @Override
+    public void testCanTransmitCount() throws Exception {
+        Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
+        Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
+    }
+
+    @Test
+    @Override
+    public void testTransmit() throws Exception {
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final long now = Ticker.systemTicker().read();
+        final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
+        queue.transmit(entry, now);
+        final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+        Assert.assertEquals(request, requestEnvelope.getMessage());
+    }
+
+    @Test
+    public void testSetForwarder() throws Exception {
+        final FakeTicker ticker = new FakeTicker();
+        ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
+        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+        final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
+        queue.enqueue(entry, ticker.read());
+        final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
+        final long setForwarderNow = ticker.read();
+        queue.setForwarder(forwarder, setForwarderNow);
+        verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
+        final long secondEnqueueNow = ticker.read();
+        queue.enqueue(entry, secondEnqueueNow);
+        verify(forwarder).forwardEntry(entry, secondEnqueueNow);
+    }
+
+}
\ No newline at end of file