import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.TestProbe;
import java.util.Optional;
import java.util.function.Consumer;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
return new AbortLocalTransactionRequest(identifier, replyTo);
}
- private static ConnectionEntryMatcher entryWithRequest(final Request<?, ?> request) {
- return new ConnectionEntryMatcher(request);
- }
-
- private static class ConnectionEntryMatcher extends BaseMatcher<ConnectionEntry> {
-
- private final Request request;
-
- private ConnectionEntryMatcher(final Request request) {
- this.request = request;
- }
-
- @Override
- public boolean matches(final Object item) {
- if (!(item instanceof ConnectionEntry)) {
- return false;
- }
- final ConnectionEntry entry = (ConnectionEntry) item;
- return this.request.equals(entry.getRequest());
- }
-
- @Override
- public void describeMismatch(final Object item, final Description description) {
- final ConnectionEntry entry = (ConnectionEntry) item;
- super.describeMismatch(entry.getRequest(), description);
- }
-
- @Override
- public void describeTo(final Description description) {
- description.appendValue(request);
- }
- }
-
-}
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Iterables;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
+
+ private static final FrontendIdentifier FRONTEND =
+ FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("type-1"));
+ private static final ClientIdentifier CLIENT = ClientIdentifier.create(FRONTEND, 0);
+ protected static final LocalHistoryIdentifier HISTORY = new LocalHistoryIdentifier(CLIENT, 0);
+ protected static final TransactionIdentifier TRANSACTION_IDENTIFIER = new TransactionIdentifier(HISTORY, 0);
+ protected T queue;
+ protected ActorSystem system;
+ protected TestProbe probe;
+
+ protected abstract int getMaxInFlightMessages();
+
+ protected abstract T createQueue();
+
+ @Before
+ public void setUp() throws Exception {
+ system = ActorSystem.apply();
+ probe = new TestProbe(system);
+ queue = createQueue();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ @Test
+ public abstract void testCanTransmitCount() throws Exception;
+
+ @Test(expected = UnsupportedOperationException.class)
+ public abstract void testTransmit() throws Exception;
+
+ @Test
+ public void testAsIterable() throws Exception {
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ final int sentMessages = getMaxInFlightMessages() + 1;
+ for (int i = 0; i < sentMessages; i++) {
+ queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ }
+ final Iterable<ConnectionEntry> entries = queue.asIterable();
+ Assert.assertEquals(sentMessages, Iterables.size(entries));
+ Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+ }
+
+ @Test
+ public void testTicksStalling() throws Exception {
+ final long now = Ticker.systemTicker().read();
+ Assert.assertEquals(0, queue.ticksStalling(now));
+ }
+
+ @Test
+ public void testCompleteReponseNotMatchingRequest() throws Exception {
+ final long requestSequence = 0L;
+ final long txSequence = 0L;
+ final long sessionId = 0L;
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ //different transaction id
+ final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
+ final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
+ final Optional<TransmittedConnectionEntry> completed1 =
+ queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
+ Assert.assertFalse(completed1.isPresent());
+ //different response sequence
+ final long differentResponseSequence = 1L;
+ final RequestSuccess<?, ?> success2 =
+ new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
+ final Optional<TransmittedConnectionEntry> completed2 =
+ queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
+ Assert.assertFalse(completed2.isPresent());
+ //different tx sequence
+ final long differentTxSequence = 1L;
+ final RequestSuccess<?, ?> success3 =
+ new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
+ final Optional<TransmittedConnectionEntry> completed3 =
+ queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
+ Assert.assertFalse(completed3.isPresent());
+ //different session id
+ final long differentSessionId = 1L;
+ final RequestSuccess<?, ?> success4 =
+ new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
+ final Optional<TransmittedConnectionEntry> completed4 =
+ queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
+ Assert.assertFalse(completed4.isPresent());
+ }
+
+ @Test
+ public void testIsEmpty() throws Exception {
+ Assert.assertTrue(queue.isEmpty());
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ Assert.assertFalse(queue.isEmpty());
+ }
+
+ @Test
+ public void testPeek() throws Exception {
+ final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
+ final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
+ queue.enqueue(entry1, now);
+ queue.enqueue(entry2, now);
+ Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
+ }
+
+ @Test
+ public void testPoison() throws Exception {
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
+ verify(callback).accept(any(TransactionFailure.class));
+ Assert.assertTrue(queue.isEmpty());
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static Consumer<Response<?, ?>> createConsumerMock() {
+ return mock(Consumer.class);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+
+/**
+ * Matcher checks, whether matched {@link ConnectionEntry} tracks provided {@link Request}.
+ */
+class ConnectionEntryMatcher extends BaseMatcher<ConnectionEntry> {
+
+ private final Request request;
+
+ /**
+ * Creates a matcher that matches if the examined {@link ConnectionEntry} contains specified request.
+ *
+ * @param request request
+ * @return matcher
+ */
+ public static ConnectionEntryMatcher entryWithRequest(final Request<?, ?> request) {
+ return new ConnectionEntryMatcher(request);
+ }
+
+ private ConnectionEntryMatcher(final Request request) {
+ this.request = request;
+ }
+
+ @Override
+ public boolean matches(final Object item) {
+ if (!(item instanceof ConnectionEntry)) {
+ return false;
+ }
+ final ConnectionEntry entry = (ConnectionEntry) item;
+ return this.request.equals(entry.getRequest());
+ }
+
+ @Override
+ public void describeMismatch(final Object item, final Description description) {
+ final ConnectionEntry entry = (ConnectionEntry) item;
+ super.describeMismatch(entry.getRequest(), description);
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendValue(request);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.base.Ticker;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+
+public class HaltedTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Halted> {
+
+ @Override
+ protected int getMaxInFlightMessages() {
+ return 0;
+ }
+
+ @Override
+ protected TransmitQueue.Halted createQueue() {
+ return new TransmitQueue.Halted(0);
+ }
+
+ @Test
+ @Override
+ public void testCanTransmitCount() throws Exception {
+ Assert.assertFalse(queue.canTransmitCount(0) > 0);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ @Override
+ public void testTransmit() throws Exception {
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
+ queue.transmit(entry, now);
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
+
+import com.google.common.base.Ticker;
+import com.google.common.collect.Iterables;
+import com.google.common.testing.FakeTicker;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
+
+ private BackendInfo backendInfo;
+
+ @Override
+ protected int getMaxInFlightMessages() {
+ return backendInfo.getMaxMessages();
+ }
+
+ @Override
+ protected TransmitQueue.Transmitting createQueue() {
+ backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
+ return new TransmitQueue.Transmitting(0, backendInfo);
+ }
+
+ @Test
+ public void testComplete() throws Exception {
+ final long sequence1 = 0L;
+ final long sequence2 = 1L;
+ final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
+ final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
+ final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
+ final Consumer<Response<?, ?>> callback1 = createConsumerMock();
+ final Consumer<Response<?, ?>> callback2 = createConsumerMock();
+ final long now1 = Ticker.systemTicker().read();
+ final long now2 = Ticker.systemTicker().read();
+ //enqueue 2 entries
+ queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
+ queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+ final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
+ final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
+ //complete entries in different order
+ final Optional<TransmittedConnectionEntry> completed2 =
+ queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
+ final Optional<TransmittedConnectionEntry> completed1 =
+ queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
+ //check first entry
+ final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
+ Assert.assertEquals(transmittedEntry1.getRequest(), request1);
+ Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1);
+ Assert.assertEquals(transmittedEntry1.getCallback(), callback1);
+ //check second entry
+ final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
+ Assert.assertEquals(transmittedEntry2.getRequest(), request2);
+ Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2);
+ Assert.assertEquals(transmittedEntry2.getCallback(), callback2);
+ }
+
+ @Test
+ public void testEnqueueCanTransmit() throws Exception {
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ Assert.assertEquals(request, requestEnvelope.getMessage());
+ }
+
+ @Test
+ public void testEnqueueBackendFull() throws Exception {
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ final int sentMessages = getMaxInFlightMessages() + 1;
+ for (int i = 0; i < sentMessages; i++) {
+ queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ }
+ for (int i = 0; i < getMaxInFlightMessages(); i++) {
+ probe.expectMsgClass(RequestEnvelope.class);
+ }
+ probe.expectNoMsg();
+ final Iterable<ConnectionEntry> entries = queue.asIterable();
+ Assert.assertEquals(sentMessages, Iterables.size(entries));
+ Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+ }
+
+ @Test
+ @Override
+ public void testCanTransmitCount() throws Exception {
+ Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
+ Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
+ }
+
+ @Test
+ @Override
+ public void testTransmit() throws Exception {
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final long now = Ticker.systemTicker().read();
+ final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
+ queue.transmit(entry, now);
+ final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ Assert.assertEquals(request, requestEnvelope.getMessage());
+ }
+
+ @Test
+ public void testSetForwarder() throws Exception {
+ final FakeTicker ticker = new FakeTicker();
+ ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
+ final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+ final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
+ queue.enqueue(entry, ticker.read());
+ final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
+ final long setForwarderNow = ticker.read();
+ queue.setForwarder(forwarder, setForwarderNow);
+ verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
+ final long secondEnqueueNow = ticker.read();
+ queue.enqueue(entry, secondEnqueueNow);
+ verify(forwarder).forwardEntry(entry, secondEnqueueNow);
+ }
+
+}
\ No newline at end of file