CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
index 4f00ed5f4bcfe4e88908d9bf2ca25417c3658e18..11ca195311cdb9014ee956c0d6ab54532b1639cc 100644 (file)
@@ -16,22 +16,21 @@ import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -47,7 +46,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testNewReadOnlyTransaction() throws Exception {
 
-     DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
+     DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
 
     }
@@ -55,7 +54,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @SuppressWarnings("resource")
     @Test
     public void testNewReadWriteTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
 
     }
@@ -63,29 +62,29 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @SuppressWarnings("resource")
     @Test
     public void testNewWriteOnlyTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
 
     }
 
     @Test
     public void testClose() throws Exception {
-        new TransactionChainProxy(mockActorContext).close();
+        new TransactionChainProxy(mockComponentFactory).close();
 
         verify(mockActorContext, times(1)).broadcast(anyObject());
     }
 
     @Test
     public void testTransactionChainsHaveUniqueId(){
-        TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class));
-        TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class));
+        TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory);
+        TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory);
 
         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
     }
 
     @Test
     public void testRateLimitingUsedInReadWriteTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newReadWriteTransaction();
 
@@ -94,7 +93,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testRateLimitingUsedInWriteOnlyTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newWriteOnlyTransaction();
 
@@ -104,20 +103,88 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newReadOnlyTransaction();
 
         verify(mockActorContext, times(0)).acquireTxCreationPermit();
     }
 
+    /**
+     * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
+     * initiated until the first one completes its read future.
+     */
+    @Test
+    public void testChainedWriteOnlyTransactions() throws Exception {
+        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
+
+        ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+        Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
+        doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
+
+        DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+        writeTx1.ready();
+
+        verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
+
+        ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+        expectBatchedModifications(txActorRef2, 1);
+
+        final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
+
+        final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+        final CountDownLatch write2Complete = new CountDownLatch(1);
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+                } catch (Exception e) {
+                    caughtEx.set(e);
+                } finally {
+                    write2Complete.countDown();
+                }
+            }
+        }.start();
+
+        assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+        if(caughtEx.get() != null) {
+            throw caughtEx.get();
+        }
+
+        try {
+            verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } catch (AssertionError e) {
+            fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+        }
+
+        batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
+
+        // Tx 2 should've proceeded to find the primary shard.
+        verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+    }
+
     /**
      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
      * initiated until the first one completes its read future.
      */
     @Test
     public void testChainedReadWriteTransactions() throws Exception {
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
@@ -125,7 +192,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+                eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
 
@@ -134,7 +201,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         writeTx1.ready();
 
-        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
 
         String tx2MemberName = "tx2MemberName";
         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
@@ -176,7 +243,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+        readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
                 eqCreateTransaction(tx2MemberName, READ_WRITE));
@@ -188,7 +255,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, 1);
 
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();