From: Andrej Mak Date: Thu, 9 Mar 2017 12:29:02 +0000 (+0100) Subject: Add TransmitQueue unit tests X-Git-Tag: release/carbon~166 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a00880a67730250c359680310bce0c10cefc31d4 Add TransmitQueue unit tests Change-Id: Id7067db80a3ba4f8befec1b8bccf764db2419eb7 Signed-off-by: Andrej Mak --- diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java index 14ddf047d4..550dd9fa51 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java @@ -14,6 +14,7 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -21,8 +22,6 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; import java.util.Optional; import java.util.function.Consumer; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -164,37 +163,4 @@ public abstract class AbstractClientConnectionTest request) { - return new ConnectionEntryMatcher(request); - } - - private static class ConnectionEntryMatcher extends BaseMatcher { - - private final Request request; - - private ConnectionEntryMatcher(final Request request) { - this.request = request; - } - - @Override - public boolean matches(final Object item) { - if (!(item instanceof ConnectionEntry)) { - return false; - } - final ConnectionEntry entry = (ConnectionEntry) item; - return this.request.equals(entry.getRequest()); - } - - @Override - public void describeMismatch(final Object item, final Description description) { - final ConnectionEntry entry = (ConnectionEntry) item; - super.describeMismatch(entry.getRequest(), description); - } - - @Override - public void describeTo(final Description description) { - description.appendValue(request); - } - } - -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java new file mode 100644 index 0000000000..8e07804633 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import static org.hamcrest.CoreMatchers.everyItem; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import com.google.common.base.Ticker; +import com.google.common.collect.Iterables; +import java.util.Optional; +import java.util.function.Consumer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.access.commands.TransactionFailure; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +public abstract class AbstractTransmitQueueTest { + + private static final FrontendIdentifier FRONTEND = + FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("type-1")); + private static final ClientIdentifier CLIENT = ClientIdentifier.create(FRONTEND, 0); + protected static final LocalHistoryIdentifier HISTORY = new LocalHistoryIdentifier(CLIENT, 0); + protected static final TransactionIdentifier TRANSACTION_IDENTIFIER = new TransactionIdentifier(HISTORY, 0); + protected T queue; + protected ActorSystem system; + protected TestProbe probe; + + protected abstract int getMaxInFlightMessages(); + + protected abstract T createQueue(); + + @Before + public void setUp() throws Exception { + system = ActorSystem.apply(); + probe = new TestProbe(system); + queue = createQueue(); + } + + @After + public void tearDown() throws Exception { + JavaTestKit.shutdownActorSystem(system); + } + + @Test + public abstract void testCanTransmitCount() throws Exception; + + @Test(expected = UnsupportedOperationException.class) + public abstract void testTransmit() throws Exception; + + @Test + public void testAsIterable() throws Exception { + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + final int sentMessages = getMaxInFlightMessages() + 1; + for (int i = 0; i < sentMessages; i++) { + queue.enqueue(new ConnectionEntry(request, callback, now), now); + } + final Iterable entries = queue.asIterable(); + Assert.assertEquals(sentMessages, Iterables.size(entries)); + Assert.assertThat(entries, everyItem(entryWithRequest(request))); + } + + @Test + public void testTicksStalling() throws Exception { + final long now = Ticker.systemTicker().read(); + Assert.assertEquals(0, queue.ticksStalling(now)); + } + + @Test + public void testCompleteReponseNotMatchingRequest() throws Exception { + final long requestSequence = 0L; + final long txSequence = 0L; + final long sessionId = 0L; + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + queue.enqueue(new ConnectionEntry(request, callback, now), now); + //different transaction id + final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L); + final RequestSuccess success1 = new TransactionPurgeResponse(anotherTxId, requestSequence); + final Optional completed1 = + queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now); + Assert.assertFalse(completed1.isPresent()); + //different response sequence + final long differentResponseSequence = 1L; + final RequestSuccess success2 = + new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence); + final Optional completed2 = + queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now); + Assert.assertFalse(completed2.isPresent()); + //different tx sequence + final long differentTxSequence = 1L; + final RequestSuccess success3 = + new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence); + final Optional completed3 = + queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now); + Assert.assertFalse(completed3.isPresent()); + //different session id + final long differentSessionId = 1L; + final RequestSuccess success4 = + new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence); + final Optional completed4 = + queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now); + Assert.assertFalse(completed4.isPresent()); + } + + @Test + public void testIsEmpty() throws Exception { + Assert.assertTrue(queue.isEmpty()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + queue.enqueue(new ConnectionEntry(request, callback, now), now); + Assert.assertFalse(queue.isEmpty()); + } + + @Test + public void testPeek() throws Exception { + final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now); + final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now); + queue.enqueue(entry1, now); + queue.enqueue(entry2, now); + Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest()); + } + + @Test + public void testPoison() throws Exception { + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail"))); + verify(callback).accept(any(TransactionFailure.class)); + Assert.assertTrue(queue.isEmpty()); + } + + @SuppressWarnings("unchecked") + protected static Consumer> createConsumerMock() { + return mock(Consumer.class); + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryMatcher.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryMatcher.java new file mode 100644 index 0000000000..d2d2fd8bd9 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectionEntryMatcher.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.opendaylight.controller.cluster.access.concepts.Request; + +/** + * Matcher checks, whether matched {@link ConnectionEntry} tracks provided {@link Request}. + */ +class ConnectionEntryMatcher extends BaseMatcher { + + private final Request request; + + /** + * Creates a matcher that matches if the examined {@link ConnectionEntry} contains specified request. + * + * @param request request + * @return matcher + */ + public static ConnectionEntryMatcher entryWithRequest(final Request request) { + return new ConnectionEntryMatcher(request); + } + + private ConnectionEntryMatcher(final Request request) { + this.request = request; + } + + @Override + public boolean matches(final Object item) { + if (!(item instanceof ConnectionEntry)) { + return false; + } + final ConnectionEntry entry = (ConnectionEntry) item; + return this.request.equals(entry.getRequest()); + } + + @Override + public void describeMismatch(final Object item, final Description description) { + final ConnectionEntry entry = (ConnectionEntry) item; + super.describeMismatch(entry.getRequest(), description); + } + + @Override + public void describeTo(final Description description) { + description.appendValue(request); + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/HaltedTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/HaltedTransmitQueueTest.java new file mode 100644 index 0000000000..80394001a8 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/HaltedTransmitQueueTest.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import com.google.common.base.Ticker; +import java.util.function.Consumer; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.Response; + +public class HaltedTransmitQueueTest extends AbstractTransmitQueueTest { + + @Override + protected int getMaxInFlightMessages() { + return 0; + } + + @Override + protected TransmitQueue.Halted createQueue() { + return new TransmitQueue.Halted(0); + } + + @Test + @Override + public void testCanTransmitCount() throws Exception { + Assert.assertFalse(queue.canTransmitCount(0) > 0); + } + + @Test(expected = UnsupportedOperationException.class) + @Override + public void testTransmit() throws Exception { + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + final ConnectionEntry entry = new ConnectionEntry(request, callback, now); + queue.transmit(entry, now); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java new file mode 100644 index 0000000000..4859591384 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import static org.hamcrest.CoreMatchers.everyItem; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; + +import com.google.common.base.Ticker; +import com.google.common.collect.Iterables; +import com.google.common.testing.FakeTicker; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest { + + private BackendInfo backendInfo; + + @Override + protected int getMaxInFlightMessages() { + return backendInfo.getMaxMessages(); + } + + @Override + protected TransmitQueue.Transmitting createQueue() { + backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3); + return new TransmitQueue.Transmitting(0, backendInfo); + } + + @Test + public void testComplete() throws Exception { + final long sequence1 = 0L; + final long sequence2 = 1L; + final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref()); + final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L); + final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref()); + final Consumer> callback1 = createConsumerMock(); + final Consumer> callback2 = createConsumerMock(); + final long now1 = Ticker.systemTicker().read(); + final long now2 = Ticker.systemTicker().read(); + //enqueue 2 entries + queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1); + queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2); + final RequestSuccess success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1); + final RequestSuccess success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2); + //complete entries in different order + final Optional completed2 = + queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2); + final Optional completed1 = + queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1); + //check first entry + final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new); + Assert.assertEquals(transmittedEntry1.getRequest(), request1); + Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1); + Assert.assertEquals(transmittedEntry1.getCallback(), callback1); + //check second entry + final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new); + Assert.assertEquals(transmittedEntry2.getRequest(), request2); + Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2); + Assert.assertEquals(transmittedEntry2.getCallback(), callback2); + } + + @Test + public void testEnqueueCanTransmit() throws Exception { + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + queue.enqueue(new ConnectionEntry(request, callback, now), now); + final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + Assert.assertEquals(request, requestEnvelope.getMessage()); + } + + @Test + public void testEnqueueBackendFull() throws Exception { + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + final int sentMessages = getMaxInFlightMessages() + 1; + for (int i = 0; i < sentMessages; i++) { + queue.enqueue(new ConnectionEntry(request, callback, now), now); + } + for (int i = 0; i < getMaxInFlightMessages(); i++) { + probe.expectMsgClass(RequestEnvelope.class); + } + probe.expectNoMsg(); + final Iterable entries = queue.asIterable(); + Assert.assertEquals(sentMessages, Iterables.size(entries)); + Assert.assertThat(entries, everyItem(entryWithRequest(request))); + } + + @Test + @Override + public void testCanTransmitCount() throws Exception { + Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0); + Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0); + } + + @Test + @Override + public void testTransmit() throws Exception { + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final long now = Ticker.systemTicker().read(); + final ConnectionEntry entry = new ConnectionEntry(request, callback, now); + queue.transmit(entry, now); + final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + Assert.assertEquals(request, requestEnvelope.getMessage()); + } + + @Test + public void testSetForwarder() throws Exception { + final FakeTicker ticker = new FakeTicker(); + ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Consumer> callback = createConsumerMock(); + final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read()); + queue.enqueue(entry, ticker.read()); + final ReconnectForwarder forwarder = mock(ReconnectForwarder.class); + final long setForwarderNow = ticker.read(); + queue.setForwarder(forwarder, setForwarderNow); + verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow)); + final long secondEnqueueNow = ticker.read(); + queue.enqueue(entry, secondEnqueueNow); + verify(forwarder).forwardEntry(entry, secondEnqueueNow); + } + +} \ No newline at end of file