BUG 2486 : Optimizations for a single node cluster deployment
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 03a18ea6c38c39600f2c4bd3a49b848ace89a09d..2c526288b5ff9353145d4db6dd51d581783111d9 100644 (file)
@@ -1,5 +1,16 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -16,8 +27,19 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
@@ -54,6 +76,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -79,25 +102,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.inOrder;
 
 public class ShardTest extends AbstractActorTest {
 
@@ -114,8 +118,6 @@ public class ShardTest extends AbstractActorTest {
 
     @Before
     public void setUp() {
-        System.setProperty("shard.persistent", "false");
-
         InMemorySnapshotStore.clear();
         InMemoryJournal.clear();
     }
@@ -187,7 +189,7 @@ public class ShardTest extends AbstractActorTest {
                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                             dataStoreContext, SCHEMA_CONTEXT) {
                         @Override
-                        public void onReceiveCommand(final Object message) {
+                        public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
                                 // Got the first ElectionTimeout. We don't forward it to the
                                 // base Shard yet until we've sent the RegisterChangeListener
@@ -305,8 +307,9 @@ public class ShardTest extends AbstractActorTest {
         }};
     }
 
+    @SuppressWarnings("serial")
     @Test
-    public void testPeerAddressResolved(){
+    public void testPeerAddressResolved() throws Exception {
         new ShardTestKit(getSystem()) {{
             final CountDownLatch recoveryComplete = new CountDownLatch(1);
             class TestShard extends Shard {
@@ -352,7 +355,7 @@ public class ShardTest extends AbstractActorTest {
     }
 
     @Test
-    public void testApplySnapshot() throws ExecutionException, InterruptedException {
+    public void testApplySnapshot() throws Exception {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
@@ -431,9 +434,9 @@ public class ShardTest extends AbstractActorTest {
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                           SCHEMA_CONTEXT))));
 
-        int nListEntries = 11;
+        int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
-        for(int i = 1; i <= nListEntries; i++) {
+        for(int i = 1; i <= nListEntries-5; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
@@ -444,6 +447,19 @@ public class ShardTest extends AbstractActorTest {
                     newPayload(mod)));
         }
 
+        // Add some of the new CompositeModificationByteStringPayload
+        for(int i = 11; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+                    SCHEMA_CONTEXT);
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newByteStringPayload(mod)));
+        }
+
+
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
                 new ApplyLogEntries(nListEntries));
 
@@ -506,7 +522,7 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    private CompositeModificationPayload newPayload(Modification... mods) {
+    private CompositeModificationPayload newPayload(final Modification... mods) {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -515,15 +531,25 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
-            InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
-            MutableCompositeModification modification) {
+    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+
+    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+            final MutableCompositeModification modification) {
         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
     }
 
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
-            InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
-            MutableCompositeModification modification,
+    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+            final MutableCompositeModification modification,
             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
 
         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
@@ -533,14 +559,14 @@ public class ShardTest extends AbstractActorTest {
 
         doAnswer(new Answer<ListenableFuture<Boolean>>() {
             @Override
-            public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
+            public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
                 return realCohort.canCommit();
             }
         }).when(cohort).canCommit();
 
         doAnswer(new Answer<ListenableFuture<Void>>() {
             @Override
-            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
                 if(preCommit != null) {
                     return preCommit.apply(realCohort);
                 } else {
@@ -551,14 +577,14 @@ public class ShardTest extends AbstractActorTest {
 
         doAnswer(new Answer<ListenableFuture<Void>>() {
             @Override
-            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
                 return realCohort.commit();
             }
         }).when(cohort).commit();
 
         doAnswer(new Answer<ListenableFuture<Void>>() {
             @Override
-            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
                 return realCohort.abort();
             }
         }).when(cohort).abort();
@@ -571,7 +597,6 @@ public class ShardTest extends AbstractActorTest {
     @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
-        System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -610,7 +635,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -624,10 +650,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -653,12 +681,12 @@ public class ShardTest extends AbstractActorTest {
             class OnFutureComplete extends OnComplete<Object> {
                 private final Class<?> expRespType;
 
-                OnFutureComplete(Class<?> expRespType) {
+                OnFutureComplete(final Class<?> expRespType) {
                     this.expRespType = expRespType;
                 }
 
                 @Override
-                public void onComplete(Throwable error, Object resp) {
+                public void onComplete(final Throwable error, final Object resp) {
                     if(error != null) {
                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
                     } else {
@@ -671,7 +699,7 @@ public class ShardTest extends AbstractActorTest {
                     }
                 }
 
-                void onSuccess(Object resp) throws Exception {
+                void onSuccess(final Object resp) throws Exception {
                 }
             }
 
@@ -681,7 +709,7 @@ public class ShardTest extends AbstractActorTest {
                 }
 
                 @Override
-                public void onComplete(Throwable error, Object resp) {
+                public void onComplete(final Throwable error, final Object resp) {
                     super.onComplete(error, resp);
                     commitLatch.countDown();
                 }
@@ -690,13 +718,13 @@ public class ShardTest extends AbstractActorTest {
             class OnCanCommitFutureComplete extends OnFutureComplete {
                 private final String transactionID;
 
-                OnCanCommitFutureComplete(String transactionID) {
+                OnCanCommitFutureComplete(final String transactionID) {
                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
                     this.transactionID = transactionID;
                 }
 
                 @Override
-                void onSuccess(Object resp) throws Exception {
+                void onSuccess(final Object resp) throws Exception {
                     CanCommitTransactionReply canCommitReply =
                             CanCommitTransactionReply.fromSerializable(resp);
                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
@@ -791,10 +819,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -821,7 +851,7 @@ public class ShardTest extends AbstractActorTest {
             final CountDownLatch latch = new CountDownLatch(1);
             canCommitFuture.onComplete(new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable t, Object resp) {
+                public void onComplete(final Throwable t, final Object resp) {
                     latch.countDown();
                 }
             }, getSystem().dispatcher());
@@ -858,7 +888,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -901,7 +932,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -914,8 +946,8 @@ public class ShardTest extends AbstractActorTest {
     }
 
     @Test
+    @Ignore("This test will work only if replication is turned on. Needs modification due to optimizations added to Shard/RaftActor.")
     public void testAbortBeforeFinishCommit() throws Throwable {
-        System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -940,7 +972,7 @@ public class ShardTest extends AbstractActorTest {
                             new AbortTransaction(transactionID).toSerializable(), timeout);
                     abortFuture.onComplete(new OnComplete<Object>() {
                         @Override
-                        public void onComplete(Throwable e, Object resp) {
+                        public void onComplete(final Throwable e, final Object resp) {
                             abortComplete.countDown();
                         }
                     }, getSystem().dispatcher());
@@ -954,7 +986,8 @@ public class ShardTest extends AbstractActorTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -1018,10 +1051,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1080,13 +1115,16 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx.
@@ -1149,10 +1187,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1179,7 +1219,7 @@ public class ShardTest extends AbstractActorTest {
             final CountDownLatch latch = new CountDownLatch(1);
             canCommitFuture.onComplete(new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable t, Object resp) {
+                public void onComplete(final Throwable t, final Object resp) {
                     latch.countDown();
                 }
             }, getSystem().dispatcher());
@@ -1196,6 +1236,19 @@ public class ShardTest extends AbstractActorTest {
 
     @Test
     public void testCreateSnapshot() throws IOException, InterruptedException {
+            testCreateSnapshot(true, "testCreateSnapshot");
+    }
+
+    @Test
+    public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+        testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
+    }
+
+    @SuppressWarnings("serial")
+    public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
+        final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
             Creator<Shard> creator = new Creator<Shard>() {
@@ -1204,8 +1257,8 @@ public class ShardTest extends AbstractActorTest {
                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                             dataStoreContext, SCHEMA_CONTEXT) {
                         @Override
-                        public void saveSnapshot(Object snapshot) {
-                            super.saveSnapshot(snapshot);
+                        protected void commitSnapshot(final long sequenceNumber) {
+                            super.commitSnapshot(sequenceNumber);
                             latch.get().countDown();
                         }
                     };
@@ -1213,7 +1266,7 @@ public class ShardTest extends AbstractActorTest {
             };
 
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
+                    Props.create(new DelegatingShardCreator(creator)), shardActorName);
 
             waitUntilLeader(shard);
 
@@ -1236,8 +1289,7 @@ public class ShardTest extends AbstractActorTest {
      */
     @Test
     public void testInMemoryDataStoreRestore() throws ReadFailedException {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
-            MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
 
         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
@@ -1247,7 +1299,7 @@ public class ShardTest extends AbstractActorTest {
         commitTransaction(putTransaction);
 
 
-        NormalizedNode expected = readStore(store);
+        NormalizedNode<?, ?> expected = readStore(store);
 
         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
 
@@ -1256,13 +1308,47 @@ public class ShardTest extends AbstractActorTest {
 
         commitTransaction(writeTransaction);
 
-        NormalizedNode actual = readStore(store);
+        NormalizedNode<?, ?> actual = readStore(store);
 
         assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testRecoveryApplicable(){
+
+        final DatastoreContext persistentContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
+
+        final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+                persistentContext, SCHEMA_CONTEXT);
+
+        final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
+
+        final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+                nonPersistentContext, SCHEMA_CONTEXT);
+
+        new ShardTestKit(getSystem()) {{
+            TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
+                    persistentProps, "testPersistence1");
+
+            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
+                    nonPersistentProps, "testPersistence2");
+
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+        }};
 
     }
 
-    private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
+
+    private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
             transaction.read(YangInstanceIdentifier.builder().build());
@@ -1276,7 +1362,7 @@ public class ShardTest extends AbstractActorTest {
         return normalizedNode;
     }
 
-    private void commitTransaction(DOMStoreWriteTransaction transaction) {
+    private void commitTransaction(final DOMStoreWriteTransaction transaction) {
         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
         ListenableFuture<Void> future =
             commitCohort.preCommit();
@@ -1292,13 +1378,13 @@ public class ShardTest extends AbstractActorTest {
         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
             public void onDataChanged(
-                AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+                final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
 
             }
         };
     }
 
-    private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+    static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
 
@@ -1313,7 +1399,7 @@ public class ShardTest extends AbstractActorTest {
         return node;
     }
 
-    private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
+    private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
         throws ExecutionException, InterruptedException {
         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
 
@@ -1324,10 +1410,11 @@ public class ShardTest extends AbstractActorTest {
         commitCohort.commit().get();
     }
 
+    @SuppressWarnings("serial")
     private static final class DelegatingShardCreator implements Creator<Shard> {
         private final Creator<Shard> delegate;
 
-        DelegatingShardCreator(Creator<Shard> delegate) {
+        DelegatingShardCreator(final Creator<Shard> delegate) {
             this.delegate = delegate;
         }