CDS: Changes to Tx abort in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 83b15b99df43f74e0a8a5d61d7493b1792c17e8b..24a5225b8744edb82981f8eb7dc4ab07eee4a6b4 100644 (file)
@@ -10,6 +10,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -124,29 +125,6 @@ public class ShardTest extends AbstractShardTest {
 
     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
 
-    final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-    protected Props newShardPropsWithRecoveryComplete() {
-
-        final Creator<Shard> creator = new Creator<Shard>() {
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(shardID, Collections.<String,String>emptyMap(),
-                        newDatastoreContext(), SCHEMA_CONTEXT) {
-                    @Override
-                    protected void onRecoveryComplete() {
-                        try {
-                            super.onRecoveryComplete();
-                        } finally {
-                            recoveryComplete.countDown();
-                        }
-                    }
-                };
-            }
-        };
-        return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
-    }
-
     @Test
     public void testRegisterChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -478,9 +456,14 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testApplySnapshot() throws Exception {
+
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
+        testkit.waitUntilLeader(shard);
+
         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
         store.setSchemaContext(SCHEMA_CONTEXT);
 
@@ -509,8 +492,12 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplyState() throws Exception {
 
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
 
+        testkit.waitUntilLeader(shard);
+
         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
@@ -527,9 +514,11 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplyStateWithCandidatePayload() throws Exception {
 
-        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
+        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
 
-        recoveryComplete.await(5,  TimeUnit.SECONDS);
+        testkit.waitUntilLeader(shard);
 
         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
@@ -2129,11 +2118,11 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testAbortTransaction() throws Throwable {
+    public void testAbortCurrentTransaction() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortTransaction");
+                    "testAbortCurrentTransaction");
 
             waitUntilLeader(shard);
 
@@ -2195,6 +2184,78 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testAbortQueuedTransaction() throws Throwable {
+        dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+        new ShardTestKit(getSystem()) {{
+            final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
+            @SuppressWarnings("serial")
+            final Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
+                            dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+                            super.onReceiveCommand(message);
+                            if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+                                if(cleaupCheckLatch.get() != null) {
+                                    cleaupCheckLatch.get().countDown();
+                                }
+                            }
+                        }
+                    };
+                }
+            };
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(
+                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx1";
+
+            final MutableCompositeModification modification = new MutableCompositeModification();
+            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
+            doReturn(Futures.immediateFuture(null)).when(cohort).abort();
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            // Ready the tx.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+            // Send the AbortTransaction message.
+
+            shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+            verify(cohort).abort();
+
+            // Verify the tx cohort is removed from queue at the cleanup check interval.
+
+            cleaupCheckLatch.set(new CountDownLatch(1));
+            assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
+                    cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
+
+            assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+            // Now send CanCommitTransaction - should fail.
+
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+
+            Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+            assertTrue("Failure type", failure instanceof IllegalStateException);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCreateSnapshot() throws Exception {
         testCreateSnapshot(true, "testCreateSnapshot");