Poison entries outside of main lock
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / AbstractTransmitQueueTest.java
index b5f1bdac7e6a01f9d0d8eeb18d53609998b1b958..7bebd7ac74b228a4e794be31aaa132b2ca212cd9 100644 (file)
@@ -8,23 +8,23 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.everyItem;
-import static org.mockito.Matchers.any;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Ticker;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
@@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
@@ -55,113 +54,111 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
     protected abstract T createQueue();
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         system = ActorSystem.apply();
         probe = new TestProbe(system);
         queue = createQueue();
     }
 
     @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system);
+    public void tearDown() {
+        TestKit.shutdownActorSystem(system);
     }
 
     @Test
-    public abstract void testCanTransmitCount() throws Exception;
+    public abstract void testCanTransmitCount();
 
     @Test(expected = UnsupportedOperationException.class)
-    public abstract void testTransmit() throws Exception;
+    public abstract void testTransmit();
 
     @Test
-    public void testAsIterable() throws Exception {
+    public void testAsIterable() {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
-            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+            queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         final Collection<ConnectionEntry> entries = queue.drain();
-        Assert.assertEquals(sentMessages, entries.size());
-        Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+        assertEquals(sentMessages, entries.size());
+        assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
     @Test
-    public void testTicksStalling() throws Exception {
+    public void testTicksStalling() {
         final long now = Ticker.systemTicker().read();
-        Assert.assertEquals(0, queue.ticksStalling(now));
+        assertEquals(0, queue.ticksStalling(now));
     }
 
     @Test
-    public void testCompleteReponseNotMatchingRequest() throws Exception {
+    public void testCompleteReponseNotMatchingRequest() {
         final long requestSequence = 0L;
         final long txSequence = 0L;
         final long sessionId = 0L;
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         //different transaction id
         final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
         final Optional<TransmittedConnectionEntry> completed1 =
                 queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
-        Assert.assertFalse(completed1.isPresent());
+        assertFalse(completed1.isPresent());
         //different response sequence
         final long differentResponseSequence = 1L;
         final RequestSuccess<?, ?> success2 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
         final Optional<TransmittedConnectionEntry> completed2 =
                 queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
-        Assert.assertFalse(completed2.isPresent());
+        assertFalse(completed2.isPresent());
         //different tx sequence
         final long differentTxSequence = 1L;
         final RequestSuccess<?, ?> success3 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
         final Optional<TransmittedConnectionEntry> completed3 =
                 queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
-        Assert.assertFalse(completed3.isPresent());
+        assertFalse(completed3.isPresent());
         //different session id
         final long differentSessionId = 1L;
         final RequestSuccess<?, ?> success4 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
         final Optional<TransmittedConnectionEntry> completed4 =
                 queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
-        Assert.assertFalse(completed4.isPresent());
+        assertFalse(completed4.isPresent());
     }
 
     @Test
-    public void testIsEmpty() throws Exception {
-        Assert.assertTrue(queue.isEmpty());
+    public void testIsEmpty() {
+        assertTrue(queue.isEmpty());
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
-        Assert.assertFalse(queue.isEmpty());
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
+        assertFalse(queue.isEmpty());
     }
 
     @Test
-    public void testPeek() throws Exception {
+    public void testPeek() {
         final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
         final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
-        queue.enqueue(entry1, now);
-        queue.enqueue(entry2, now);
-        Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
+        queue.enqueueOrForward(entry1, now);
+        queue.enqueueOrForward(entry2, now);
+        assertEquals(entry1.getRequest(), queue.peek().getRequest());
     }
 
     @Test
-    public void testPoison() throws Exception {
+    public void testPoison() {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
-        queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
-        verify(callback).accept(any(TransactionFailure.class));
-        Assert.assertTrue(queue.isEmpty());
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
+        assertEquals(1, queue.poison().size());
     }
 
     @SuppressWarnings("unchecked")