CDS: Retry remote front-end transactions on AskTimeoutException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
index acba775445879d5a1305969cc099fc6fd35b2cb8..03e4a16e679dd50de0fe87aa2a177d80cf75cb9e 100644 (file)
@@ -12,29 +12,27 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
+import akka.util.Timeout;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -50,7 +48,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testNewReadOnlyTransaction() throws Exception {
 
-     DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
+     DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
 
     }
@@ -58,7 +56,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @SuppressWarnings("resource")
     @Test
     public void testNewReadWriteTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
 
     }
@@ -66,29 +64,29 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @SuppressWarnings("resource")
     @Test
     public void testNewWriteOnlyTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
 
     }
 
     @Test
     public void testClose() throws Exception {
-        new TransactionChainProxy(mockActorContext).close();
+        new TransactionChainProxy(mockComponentFactory).close();
 
         verify(mockActorContext, times(1)).broadcast(anyObject());
     }
 
     @Test
     public void testTransactionChainsHaveUniqueId(){
-        TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class));
-        TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class));
+        TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory);
+        TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory);
 
         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
     }
 
     @Test
     public void testRateLimitingUsedInReadWriteTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newReadWriteTransaction();
 
@@ -97,7 +95,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testRateLimitingUsedInWriteOnlyTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newWriteOnlyTransaction();
 
@@ -107,7 +105,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newReadOnlyTransaction();
 
@@ -122,7 +120,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     public void testChainedWriteOnlyTransactions() throws Exception {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
 
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
 
@@ -176,7 +174,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+        batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         // Tx 2 should've proceeded to find the primary shard.
         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -188,7 +186,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
      */
     @Test
     public void testChainedReadWriteTransactions() throws Exception {
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
@@ -196,7 +194,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+                eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
 
@@ -205,10 +203,9 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         writeTx1.ready();
 
-        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
 
-        String tx2MemberName = "tx2MemberName";
-        doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
+        String tx2MemberName = "mock-member";
         ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
         ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
                 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
@@ -247,10 +244,10 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+        readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
-                eqCreateTransaction(tx2MemberName, READ_WRITE));
+                eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
     }
 
     @Test(expected=IllegalStateException.class)
@@ -259,7 +256,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, 1);
 
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();