Fix shard deadlock in 3 nodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index 5ab1e5ade3fc5a2333fe14f3f31d36733019d133..4fa4fcd1dd3fa728d135fb1936daefcac0649065 100644 (file)
@@ -31,18 +31,22 @@ import akka.dispatch.Futures;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collection;
 import java.util.List;
+import java.util.SortedSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
@@ -56,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
@@ -65,8 +70,8 @@ import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -138,7 +143,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
 
-    private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) throws Exception {
+    private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
+            throws Exception {
         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
 
         if (exToThrow instanceof PrimaryNotFoundException) {
@@ -156,7 +162,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
     }
 
-    private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Exception {
+    private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Exception {
         testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
     }
 
@@ -355,7 +361,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
                 new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
                     @Override
-                    public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+                    public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
                         try {
                             transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
                         } catch (Exception e) {
@@ -366,19 +372,20 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                     }
 
                     @Override
-                    public void onFailure(Throwable failure) {
+                    public void onFailure(final Throwable failure) {
                         caughtEx.set(failure);
                         readComplete.countDown();
                     }
-                });
+                }, MoreExecutors.directExecutor());
 
         createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
 
         Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
 
-        if (caughtEx.get() != null) {
-            Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
-            Throwables.propagate(caughtEx.get());
+        final Throwable t = caughtEx.get();
+        if (t != null) {
+            Throwables.propagateIfPossible(t, Exception.class);
+            throw new RuntimeException(t);
         }
 
         // This sends the batched modification.
@@ -521,22 +528,59 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     public void testReadyWithMultipleShardWrites() throws Exception {
         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
-        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
+                TestModel.JUNK_QNAME.getLocalName());
 
         expectBatchedModificationsReady(actorRef1);
         expectBatchedModificationsReady(actorRef2);
 
+        ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
+                .actorSelection(actorRef3.path().toString());
+
+        doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
+                .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
+
+        expectReadyLocalTransaction(actorRef3, false);
+
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
 
         verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
-                actorSelection(actorRef2));
+                actorSelection(actorRef2), actorSelection(actorRef3));
+
+        SortedSet<String> expShardNames =
+                ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
+                        TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
+
+        ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", true,
+                batchedMods.getValue().getParticipatingShardNames().isPresent());
+        assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
+
+        batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", true,
+                batchedMods.getValue().getParticipatingShardNames().isPresent());
+        assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
+
+        ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", true,
+                readyLocalTx.getValue().getParticipatingShardNames().isPresent());
+        assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
     }
 
     @Test
@@ -654,6 +698,12 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
         assertTrue(ready instanceof SingleCommitCohortProxy);
         verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
+
+        ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
+        assertEquals("Participating shards present", false,
+                readyLocalTx.getValue().getParticipatingShardNames().isPresent());
     }
 
     @Test
@@ -682,7 +732,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
     }
 
-    private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
+    private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) throws Exception {
         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
@@ -722,7 +772,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
         ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
-        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
+                TestModel.JUNK_QNAME.getLocalName());
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
@@ -773,26 +824,27 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         void run(TransactionProxy transactionProxy);
     }
 
-    private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef) {
+    private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
     }
 
-    private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, DataTree dataTree) {
+    private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
                 dataTree);
     }
 
-    private void throttleOperation(TransactionProxyOperation operation) {
+    private void throttleOperation(final TransactionProxyOperation operation) {
         throttleOperation(operation, 1, true);
     }
 
-    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound) {
+    private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
+            final boolean shardFound) {
         throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
                 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
     }
 
-    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound,
-            long expectedCompletionTime) {
+    private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
+            final boolean shardFound, final long expectedCompletionTime) {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
@@ -834,11 +886,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     }
 
-    private void completeOperation(TransactionProxyOperation operation) {
+    private void completeOperation(final TransactionProxyOperation operation) {
         completeOperation(operation, true);
     }
 
-    private void completeOperation(TransactionProxyOperation operation, boolean shardFound) {
+    private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
@@ -878,7 +930,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 expected, end - start), end - start <= expected);
     }
 
-    private void completeOperationLocal(TransactionProxyOperation operation, DataTree dataTree) {
+    private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
@@ -913,14 +965,15 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         return dataTree;
     }
 
-    private static DataTree createDataTree(NormalizedNode<?, ?> readResponse) {
+    private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
         DataTree dataTree = mock(DataTree.class);
         DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
         DataTreeModification dataTreeModification = mock(DataTreeModification.class);
 
         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
         doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
-        doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
+        doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
+            any(YangInstanceIdentifier.class));
 
         return dataTree;
     }
@@ -1238,7 +1291,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 .getOperationTimeoutInMillis()) * 2);
     }
 
-    private void testModificationOperationBatching(TransactionType type) throws Exception {
+    private void testModificationOperationBatching(final TransactionType type) throws Exception {
         int shardBatchedModificationCount = 3;
         dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
 
@@ -1448,7 +1501,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
 
-    private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
+    private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));