BUG 2134 : Make persistence configurable at the datastore level 52/12352/5
authorMoiz Raja <moraja@cisco.com>
Fri, 17 Oct 2014 23:42:41 +0000 (16:42 -0700)
committerMoiz Raja <moraja@cisco.com>
Thu, 30 Oct 2014 20:59:57 +0000 (20:59 +0000)
- Added a peristent flag in the config sub-system
- If persistent in not true then we ignore recovery messages and not use the
  akka persistence apis for persisting anything
- The unit tests
    - assume that persistence is on by default (which it is)
    - test that if recovery is applicable only then the ShardManager and RaftActor process the recovoery messages.
    - test that when persisting the data-persistence API is used in the appropriate places (see RaftActorTest/ShardManagerTest)

Change-Id: I19913bcd32e609ccde6ad8e35209788315504426
Signed-off-by: Moiz Raja <moraja@cisco.com>
23 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/FileConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ResourceConfigurationReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index 97b912e..06538fd 100644 (file)
@@ -11,10 +11,9 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
-
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
-
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -38,6 +37,7 @@ import java.util.Map;
 public class ExampleActor extends RaftActor {
 
     private final Map<String, String> state = new HashMap();
+    private final DataPersistenceProvider dataPersistenceProvider;
 
     private long persistIdentifier = 1;
 
@@ -45,6 +45,7 @@ public class ExampleActor extends RaftActor {
     public ExampleActor(String id, Map<String, String> peerAddresses,
         Optional<ConfigParams> configParams) {
         super(id, peerAddresses, configParams);
+        this.dataPersistenceProvider = new PersistentDataProvider();
     }
 
     public static Props props(final String id, final Map<String, String> peerAddresses,
@@ -57,7 +58,7 @@ public class ExampleActor extends RaftActor {
         });
     }
 
-    @Override public void onReceiveCommand(Object message){
+    @Override public void onReceiveCommand(Object message) throws Exception{
         if(message instanceof KeyValue){
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
@@ -160,7 +161,12 @@ public class ExampleActor extends RaftActor {
 
     }
 
-    @Override public void onReceiveRecover(Object message) {
+    @Override
+    protected DataPersistenceProvider persistence() {
+        return dataPersistenceProvider;
+    }
+
+    @Override public void onReceiveRecover(Object message)throws Exception {
         super.onReceiveRecover(message);
     }
 
index 66a46ef..2459c2f 100644 (file)
@@ -18,10 +18,11 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
-import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -38,6 +39,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+
 import java.io.Serializable;
 import java.util.Map;
 
@@ -81,7 +83,7 @@ import java.util.Map;
  * <li> when a snapshot should be saved </li>
  * </ul>
  */
-public abstract class RaftActor extends UntypedPersistentActor {
+public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
@@ -135,24 +137,40 @@ public abstract class RaftActor extends UntypedPersistentActor {
     public void preStart() throws Exception {
         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
                 context.getConfigParams().getJournalRecoveryLogBatchSize());
+
         super.preStart();
     }
 
     @Override
-    public void onReceiveRecover(Object message) {
-        if (message instanceof SnapshotOffer) {
-            onRecoveredSnapshot((SnapshotOffer)message);
-        } else if (message instanceof ReplicatedLogEntry) {
-            onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
-        } else if (message instanceof ApplyLogEntries) {
-            onRecoveredApplyLogEntries((ApplyLogEntries)message);
-        } else if (message instanceof DeleteEntries) {
-            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
-        } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                    ((UpdateElectionTerm) message).getVotedFor());
-        } else if (message instanceof RecoveryCompleted) {
-            onRecoveryCompletedMessage();
+    public void handleRecover(Object message) {
+        if(persistence().isRecoveryApplicable()) {
+            if (message instanceof SnapshotOffer) {
+                onRecoveredSnapshot((SnapshotOffer) message);
+            } else if (message instanceof ReplicatedLogEntry) {
+                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+            } else if (message instanceof ApplyLogEntries) {
+                onRecoveredApplyLogEntries((ApplyLogEntries) message);
+            } else if (message instanceof DeleteEntries) {
+                replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+            } else if (message instanceof UpdateElectionTerm) {
+                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                        ((UpdateElectionTerm) message).getVotedFor());
+            } else if (message instanceof RecoveryCompleted) {
+                onRecoveryCompletedMessage();
+            }
+        } else {
+            if (message instanceof RecoveryCompleted) {
+                // Delete all the messages from the akka journal so that we do not end up with consistency issues
+                // Note I am not using the dataPersistenceProvider and directly using the akka api here
+                deleteMessages(lastSequenceNr());
+
+                // Delete all the akka snapshots as they will not be needed
+                deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
+
+                onRecoveryComplete();
+                currentBehavior = new Follower(context);
+                onStateChanged();
+            }
         }
     }
 
@@ -254,7 +272,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         onStateChanged();
     }
 
-    @Override public void onReceiveCommand(Object message) {
+    @Override public void handleCommand(Object message) {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -272,7 +290,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
             }
-            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+            persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
                 public void apply(ApplyLogEntries param) throws Exception {
                 }
@@ -304,10 +322,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
             LOG.info("SaveSnapshotSuccess received for snapshot");
 
-            context.getReplicatedLog().snapshotCommit();
+            long sequenceNumber = success.metadata().sequenceNr();
 
-            // TODO: Not sure if we want to be this aggressive with trimming stuff
-            trimPersistentData(success.metadata().sequenceNr());
+            commitSnapshot(sequenceNumber);
 
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
@@ -485,7 +502,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         context.setPeerAddress(peerId, peerAddress);
     }
 
+    protected void commitSnapshot(long sequenceNumber) {
+        context.getReplicatedLog().snapshotCommit();
 
+        // TODO: Not sure if we want to be this aggressive with trimming stuff
+        trimPersistentData(sequenceNumber);
+    }
 
     /**
      * The applyState method will be called by the RaftActor when some data
@@ -515,7 +537,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     /**
      * This method is called during recovery to append state data to the current batch. This method
-     * is called 1 or more times after {@link #startRecoveryStateBatch}.
+     * is called 1 or more times after {@link #startLogRecoveryBatch}.
      *
      * @param data the state data
      */
@@ -530,7 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
-     * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
      */
     protected abstract void applyCurrentLogRecoveryBatch();
 
@@ -566,17 +588,19 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected abstract void onStateChanged();
 
+    protected abstract DataPersistenceProvider persistence();
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private void trimPersistentData(long sequenceNumber) {
         // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
         // For now guessing that it is ANDed.
-        deleteSnapshots(new SnapshotSelectionCriteria(
+        persistence().deleteSnapshots(new SnapshotSelectionCriteria(
             sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
         // Trim akka journal
-        deleteMessages(sequenceNumber);
+        persistence().deleteMessages(sequenceNumber);
     }
 
     private String getLeaderAddress(){
@@ -605,7 +629,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
-        saveSnapshot(sn);
+        persistence().saveSnapshot(sn);
 
         LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
 
@@ -647,7 +671,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // FIXME: Maybe this should be done after the command is saved
             journal.subList(adjustedIndex , journal.size()).clear();
 
-            persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
 
                 @Override public void apply(DeleteEntries param)
                     throws Exception {
@@ -677,7 +701,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // persist call and the execution(s) of the associated event
             // handler. This also holds for multiple persist calls in context
             // of a single command.
-            persist(replicatedLogEntry,
+            persistence().persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
@@ -723,7 +747,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     }
 
-    private static class DeleteEntries implements Serializable {
+    static class DeleteEntries implements Serializable {
         private final int fromIndex;
 
 
@@ -766,7 +790,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public void updateAndPersist(long currentTerm, String votedFor){
             update(currentTerm, votedFor);
             // FIXME : Maybe first persist then update the state
-            persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+            persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
 
                 @Override public void apply(UpdateElectionTerm param)
                     throws Exception {
@@ -776,7 +800,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
-    private static class UpdateElectionTerm implements Serializable {
+    static class UpdateElectionTerm implements Serializable {
         private final long currentTerm;
         private final String votedFor;
 
@@ -794,4 +818,29 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
+
+        public NonPersistentRaftDataProvider(){
+
+        }
+
+        /**
+         * The way snapshotting works is,
+         * <ol>
+         * <li> RaftActor calls createSnapshot on the Shard
+         * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+         * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
+         * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
+         * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
+         * in SaveSnapshotSuccess.
+         * </ol>
+         * @param o
+         */
+        @Override
+        public void saveSnapshot(Object o) {
+            // Make saving Snapshot successful
+            commitSnapshot(-1L);
+        }
+    }
+
 }
index c15c919..69af77c 100644 (file)
@@ -7,19 +7,29 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.event.Logging;
 import akka.japi.Creator;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotOffer;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 import scala.concurrent.duration.FiniteDuration;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -32,7 +42,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
 
 public class RaftActorTest extends AbstractActorTest {
 
@@ -45,30 +58,42 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
+        private final DataPersistenceProvider dataPersistenceProvider;
+
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private final Map<String, String> peerAddresses;
             private final String id;
             private final Optional<ConfigParams> config;
+            private final DataPersistenceProvider dataPersistenceProvider;
 
             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                    Optional<ConfigParams> config) {
+                    Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
                 this.peerAddresses = peerAddresses;
                 this.id = id;
                 this.config = config;
+                this.dataPersistenceProvider = dataPersistenceProvider;
             }
 
             @Override
             public MockRaftActor create() throws Exception {
-                return new MockRaftActor(id, peerAddresses, config);
+                return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
             }
         }
 
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
+        private final CountDownLatch applyStateLatch = new CountDownLatch(1);
+
         private final List<Object> state;
 
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
+            if(dataPersistenceProvider == null){
+                this.dataPersistenceProvider = new PersistentDataProvider();
+            } else {
+                this.dataPersistenceProvider = dataPersistenceProvider;
+            }
         }
 
         public void waitForRecoveryComplete() {
@@ -79,16 +104,27 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
+        public CountDownLatch getApplyRecoverySnapshotLatch(){
+            return applyRecoverySnapshot;
+        }
+
         public List<Object> getState() {
             return state;
         }
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                 Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
         }
 
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                                  Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+        }
+
+
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+            applyStateLatch.countDown();
         }
 
         @Override
@@ -111,6 +147,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         @Override
         protected void applyRecoverySnapshot(ByteString snapshot) {
+            applyRecoverySnapshot.countDown();
             try {
                 Object data = toObject(snapshot);
                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
@@ -123,7 +160,6 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void createSnapshot() {
-            throw new UnsupportedOperationException("createSnapshot");
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
@@ -132,6 +168,11 @@ public class RaftActorTest extends AbstractActorTest {
         @Override protected void onStateChanged() {
         }
 
+        @Override
+        protected DataPersistenceProvider persistence() {
+            return this.dataPersistenceProvider;
+        }
+
         @Override public String persistenceId() {
             return this.getId();
         }
@@ -155,6 +196,9 @@ public class RaftActorTest extends AbstractActorTest {
             return obj;
         }
 
+        public ReplicatedLog getReplicatedLog(){
+            return this.getRaftActorContext().getReplicatedLog();
+        }
 
     }
 
@@ -294,6 +338,343 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    /**
+     * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
+     * process recovery messages
+     *
+     * @throws Exception
+     */
+
+    @Test
+    public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+
+                CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
+
+                assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+
+                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+
+                assertEquals("add replicated log entry", 1, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+
+                assertEquals("add replicated log entry", 2, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+
+                assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
+
+                // The snapshot had 4 items + we added 2 more items during the test
+                // We start removing from 5 and we should get 1 item in the replicated log
+                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
+
+                assertEquals("remove log entries", 1, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+
+                assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+                assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
+
+                mockRaftActor.waitForRecoveryComplete();
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }};
+    }
+
+    /**
+     * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
+     * not process recovery messages
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
+
+                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+
+                CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
+
+                assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+
+                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+
+                assertEquals("add replicated log entry", 0, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+
+                assertEquals("add replicated log entry", 0, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+
+                assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
+
+                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
+
+                assertEquals("remove log entries", 0, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+
+                assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+                assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
+
+                mockRaftActor.waitForRecoveryComplete();
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+            }};
+    }
+
+
+    @Test
+    public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(2);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
+
+                mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testApplyLogEntriesCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+
+                assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
+                CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
+                dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+
+                mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
+
+                assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
+
+                assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
index 2f53d4a..8a45108 100644 (file)
@@ -15,6 +15,7 @@ akka {
         }
 
         serialization-bindings {
+            "org.opendaylight.controller.cluster.common.actor.Monitor" = java
             "org.opendaylight.controller.cluster.raft.client.messages.FindLeader" = java
             "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
             "com.google.protobuf.Message" = proto
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java
new file mode 100644 (file)
index 0000000..db4bf31
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+
+/**
+ * DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence
+ * API.
+ */
+public interface DataPersistenceProvider {
+    /**
+     * @return false if recovery is not applicable. In that case the provider is not persistent and may not have
+     * anything to be recovered
+     */
+    boolean isRecoveryApplicable();
+
+    /**
+     * Persist a journal entry.
+     *
+     * @param o
+     * @param procedure
+     * @param <T>
+     */
+    <T> void persist(T o, Procedure<T> procedure);
+
+    /**
+     * Save a snapshot
+     *
+     * @param o
+     */
+    void saveSnapshot(Object o);
+
+    /**
+     * Delete snapshots based on the criteria
+     *
+     * @param criteria
+     */
+    void deleteSnapshots(SnapshotSelectionCriteria criteria);
+
+    /**
+     * Delete journal entries up to the sequence number
+     *
+     * @param sequenceNumber
+     */
+    void deleteMessages(long sequenceNumber);
+
+}
index 36b2866..8a6217d 100644 (file)
@@ -10,7 +10,10 @@ package org.opendaylight.controller.cluster.common.actor;
 
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 
 public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
 
@@ -67,4 +70,71 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
         }
         unhandled(message);
     }
+
+    protected class PersistentDataProvider implements DataPersistenceProvider {
+
+        public PersistentDataProvider(){
+
+        }
+
+        @Override
+        public boolean isRecoveryApplicable() {
+            return true;
+        }
+
+        @Override
+        public <T> void persist(T o, Procedure<T> procedure) {
+            AbstractUntypedPersistentActor.this.persist(o, procedure);
+        }
+
+        @Override
+        public void saveSnapshot(Object o) {
+            AbstractUntypedPersistentActor.this.saveSnapshot(o);
+        }
+
+        @Override
+        public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+            AbstractUntypedPersistentActor.this.deleteSnapshots(criteria);
+        }
+
+        @Override
+        public void deleteMessages(long sequenceNumber) {
+            AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber);
+        }
+    }
+
+    protected class NonPersistentDataProvider implements DataPersistenceProvider {
+
+        public NonPersistentDataProvider(){
+
+        }
+
+        @Override
+        public boolean isRecoveryApplicable() {
+            return false;
+        }
+
+        @Override
+        public <T> void persist(T o, Procedure<T> procedure) {
+            try {
+                procedure.apply(o);
+            } catch (Exception e) {
+                LOG.error(e, "An unexpected error occurred");
+            }
+        }
+
+        @Override
+        public void saveSnapshot(Object o) {
+        }
+
+        @Override
+        public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+
+        }
+
+        @Override
+        public void deleteMessages(long sequenceNumber) {
+
+        }
+    }
 }
index b2a43c0..88ce791 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 
-public class Monitor {
+import java.io.Serializable;
+
+public class Monitor implements Serializable {
     private final ActorRef actorRef;
 
     public Monitor(ActorRef actorRef){
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java
new file mode 100644 (file)
index 0000000..33d4056
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This class is intended for testing purposes. It just triggers CountDownLatch's in each method.
+ * This class really should be under src/test/java but it was problematic trying to uses it in other projects.
+ */
+public class DataPersistenceProviderMonitor implements DataPersistenceProvider {
+
+    private CountDownLatch persistLatch = new CountDownLatch(1);
+    private CountDownLatch saveSnapshotLatch = new CountDownLatch(1);
+    private CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);;
+    private CountDownLatch deleteMessagesLatch = new CountDownLatch(1);;
+
+    @Override
+    public boolean isRecoveryApplicable() {
+        return false;
+    }
+
+    @Override
+    public <T> void persist(T o, Procedure<T> procedure) {
+        persistLatch.countDown();
+    }
+
+    @Override
+    public void saveSnapshot(Object o) {
+        saveSnapshotLatch.countDown();
+    }
+
+    @Override
+    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+        deleteSnapshotsLatch.countDown();
+    }
+
+    @Override
+    public void deleteMessages(long sequenceNumber) {
+        deleteMessagesLatch.countDown();
+    }
+
+    public void setPersistLatch(CountDownLatch persistLatch) {
+        this.persistLatch = persistLatch;
+    }
+
+    public void setSaveSnapshotLatch(CountDownLatch saveSnapshotLatch) {
+        this.saveSnapshotLatch = saveSnapshotLatch;
+    }
+
+    public void setDeleteSnapshotsLatch(CountDownLatch deleteSnapshotsLatch) {
+        this.deleteSnapshotsLatch = deleteSnapshotsLatch;
+    }
+
+    public void setDeleteMessagesLatch(CountDownLatch deleteMessagesLatch) {
+        this.deleteMessagesLatch = deleteMessagesLatch;
+    }
+}
index 03d331b..2048bde 100644 (file)
@@ -8,12 +8,15 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
+import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import akka.util.Timeout;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
+
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -32,12 +35,15 @@ public class DatastoreContext {
     private final int shardTransactionCommitQueueCapacity;
     private final Timeout shardInitializationTimeout;
     private final Timeout shardLeaderElectionTimeout;
+    private final boolean persistent;
+    private final ConfigurationReader configurationReader;
 
     private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
             ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
             Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
             int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
-            Timeout shardLeaderElectionTimeout) {
+            Timeout shardLeaderElectionTimeout,
+            boolean persistent, ConfigurationReader configurationReader) {
         this.dataStoreProperties = dataStoreProperties;
         this.shardRaftConfig = shardRaftConfig;
         this.dataStoreMXBeanType = dataStoreMXBeanType;
@@ -47,6 +53,8 @@ public class DatastoreContext {
         this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
         this.shardInitializationTimeout = shardInitializationTimeout;
         this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
+        this.persistent = persistent;
+        this.configurationReader = configurationReader;
     }
 
     public static Builder newBuilder() {
@@ -89,6 +97,14 @@ public class DatastoreContext {
         return shardLeaderElectionTimeout;
     }
 
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public ConfigurationReader getConfigurationReader() {
+        return configurationReader;
+    }
+
     public static class Builder {
         private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
         private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
@@ -101,6 +117,8 @@ public class DatastoreContext {
         private int shardTransactionCommitQueueCapacity = 20000;
         private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
         private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
+        private boolean persistent = true;
+        private ConfigurationReader configurationReader = new FileConfigurationReader();
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -157,6 +175,17 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder configurationReader(ConfigurationReader configurationReader){
+            this.configurationReader = configurationReader;
+            return this;
+        }
+
+
+        public Builder persistent(boolean persistent){
+            this.persistent = persistent;
+            return this;
+        }
+
         public DatastoreContext build() {
             DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
             raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
@@ -167,7 +196,8 @@ public class DatastoreContext {
             return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
                     operationTimeoutInSeconds, shardTransactionIdleTimeout,
                     shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
-                    shardInitializationTimeout, shardLeaderElectionTimeout);
+                    shardInitializationTimeout, shardLeaderElectionTimeout,
+                    persistent, configurationReader);
         }
     }
 }
index a6f187d..004faf2 100644 (file)
@@ -11,27 +11,26 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.osgi.BundleDelegatingClassLoader;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.osgi.framework.BundleContext;
 
-import java.io.File;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class DistributedDataStoreFactory {
 
-    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
     public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+
     public static final String CONFIGURATION_NAME = "odl-cluster-data";
-    private static AtomicReference<ActorSystem> actorSystem = new AtomicReference<>();
+
+    private static AtomicReference<ActorSystem> persistentActorSystem = new AtomicReference<>();
 
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
                                                       DatastoreContext datastoreContext, BundleContext bundleContext) {
 
-        ActorSystem actorSystem = getOrCreateInstance(bundleContext);
+        ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
                 new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
@@ -42,27 +41,26 @@ public class DistributedDataStoreFactory {
         return dataStore;
     }
 
-    synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) {
+    synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) {
+
+        AtomicReference<ActorSystem> actorSystemReference = persistentActorSystem;
+        String configurationName = CONFIGURATION_NAME;
+        String actorSystemName = ACTOR_SYSTEM_NAME;
 
-        if (actorSystem.get() != null){
-            return actorSystem.get();
+        if (actorSystemReference.get() != null){
+            return actorSystemReference.get();
         }
+
         // Create an OSGi bundle classloader for actor system
         BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
                 Thread.currentThread().getContextClassLoader());
 
-        ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
-                ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+        ActorSystem system = ActorSystem.create(actorSystemName,
+                ConfigFactory.load(configurationReader.read()).getConfig(configurationName), classLoader);
         system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
 
-        actorSystem.set(system);
+        actorSystemReference.set(system);
         return system;
     }
 
-
-    private static final Config readAkkaConfiguration() {
-        File defaultConfigFile = new File(AKKA_CONF_PATH);
-        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
-        return ConfigFactory.parseFile(defaultConfigFile);
-    }
 }
index 770cdec..7d67e08 100644 (file)
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
@@ -104,10 +105,6 @@ public class Shard extends RaftActor {
     private final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
-    // By default persistent will be true and can be turned off using the system
-    // property shard.persistent
-    private final boolean persistent;
-
     /// The name of this shard
     private final ShardIdentifier name;
 
@@ -120,6 +117,8 @@ public class Shard extends RaftActor {
 
     private final DatastoreContext datastoreContext;
 
+    private final DataPersistenceProvider dataPersistenceProvider;
+
     private SchemaContext schemaContext;
 
     private ActorRef createSnapshotTransaction;
@@ -148,12 +147,9 @@ public class Shard extends RaftActor {
         this.name = name;
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
+        this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
 
-        String setting = System.getProperty("shard.persistent");
-
-        this.persistent = !"false".equals(setting);
-
-        LOG.info("Shard created : {} persistent : {}", name, persistent);
+        LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
 
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
@@ -211,7 +207,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveRecover(Object message) {
+    public void onReceiveRecover(Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveRecover: Received message {} from {}",
                 message.getClass().toString(),
@@ -230,7 +226,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveCommand(Object message) {
+    public void onReceiveCommand(Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
@@ -308,12 +304,8 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().preCommit().get();
 
-            if(persistent) {
-                Shard.this.persistData(getSender(), transactionID,
-                        new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
-            } else {
-                Shard.this.finishCommit(getSender(), transactionID);
-            }
+            Shard.this.persistData(getSender(), transactionID,
+                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -843,6 +835,11 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected DataPersistenceProvider persistence() {
+        return dataPersistenceProvider;
+    }
+
     @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
         shardMBean.setLeader(newLeader);
     }
@@ -851,6 +848,11 @@ public class Shard extends RaftActor {
         return this.name.toString();
     }
 
+    @VisibleForTesting
+    DataPersistenceProvider getDataPersistenceProvider() {
+        return dataPersistenceProvider;
+    }
+
     private static class ShardCreator implements Creator<Shard> {
 
         private static final long serialVersionUID = 1L;
index e861165..c7213e6 100644 (file)
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -44,6 +45,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -91,6 +93,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Collection<String> knownModules = new HashSet<>(128);
 
+    private final DataPersistenceProvider dataPersistenceProvider;
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
@@ -102,6 +106,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
+        this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -109,6 +114,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
+    protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+        return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+    }
+
     public static Props props(final String type,
         final ClusterWrapper cluster,
         final Configuration configuration,
@@ -170,18 +179,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     protected void handleRecover(Object message) throws Exception {
-        if(message instanceof SchemaContextModules){
-            SchemaContextModules msg = (SchemaContextModules) message;
-            knownModules.clear();
-            knownModules.addAll(msg.getModules());
-        } else if(message instanceof RecoveryFailure){
-            RecoveryFailure failure = (RecoveryFailure) message;
-            LOG.error(failure.cause(), "Recovery failed");
-        } else if(message instanceof RecoveryCompleted){
-            LOG.info("Recovery complete : {}", persistenceId());
-
-            // Delete all the messages from the akka journal except the last one
-            deleteMessages(lastSequenceNr() - 1);
+        if(dataPersistenceProvider.isRecoveryApplicable()) {
+            if (message instanceof SchemaContextModules) {
+                SchemaContextModules msg = (SchemaContextModules) message;
+                knownModules.clear();
+                knownModules.addAll(msg.getModules());
+            } else if (message instanceof RecoveryFailure) {
+                RecoveryFailure failure = (RecoveryFailure) message;
+                LOG.error(failure.cause(), "Recovery failed");
+            } else if (message instanceof RecoveryCompleted) {
+                LOG.info("Recovery complete : {}", persistenceId());
+
+                // Delete all the messages from the akka journal except the last one
+                deleteMessages(lastSequenceNr() - 1);
+            }
+        } else {
+            if (message instanceof RecoveryCompleted) {
+                LOG.info("Recovery complete : {}", persistenceId());
+
+                // Delete all the messages from the akka journal
+                deleteMessages(lastSequenceNr());
+            }
         }
     }
 
@@ -262,15 +280,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             knownModules.clear();
             knownModules.addAll(newModules);
 
-            persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+            dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
 
                 @Override
                 public void apply(SchemaContextModules param) throws Exception {
                     LOG.info("Sending new SchemaContext to Shards");
                     for (ShardInformation info : localShards.values()) {
-                        if(info.getActor() == null) {
+                        if (info.getActor() == null) {
                             info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
-                                    info.getPeerAddresses(), datastoreContext, schemaContext),
+                                            info.getPeerAddresses(), datastoreContext, schemaContext),
                                     info.getShardId().toString()));
                         } else {
                             info.getActor().tell(message, getSelf());
@@ -430,6 +448,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return knownModules;
     }
 
+    @VisibleForTesting
+    DataPersistenceProvider getDataPersistenceProvider() {
+        return dataPersistenceProvider;
+    }
+
     private class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationReader.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationReader.java
new file mode 100644 (file)
index 0000000..12afdbd
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import com.typesafe.config.Config;
+
+public interface ConfigurationReader {
+    Config read();
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/FileConfigurationReader.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/FileConfigurationReader.java
new file mode 100644 (file)
index 0000000..fb84734
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.io.File;
+
+public class FileConfigurationReader implements ConfigurationReader{
+
+    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+
+    @Override
+    public Config read() {
+        File defaultConfigFile = new File(AKKA_CONF_PATH);
+        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+        return ConfigFactory.parseFile(defaultConfigFile);
+
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ResourceConfigurationReader.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ResourceConfigurationReader.java
new file mode 100644 (file)
index 0000000..df17f97
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class ResourceConfigurationReader implements ConfigurationReader {
+    @Override
+    public Config read() {
+        return ConfigFactory.load();
+    }
+}
index a675b40..2f3fbdc 100644 (file)
@@ -62,6 +62,7 @@ public class DistributedConfigDataStoreProviderModule extends
                         props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
                 .shardTransactionCommitQueueCapacity(
                         props.getShardTransactionCommitQueueCapacity().getValue().intValue())
+                .persistent(props.getPersistent().booleanValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
index 21cb799..ecb3a91 100644 (file)
@@ -62,6 +62,7 @@ public class DistributedOperationalDataStoreProviderModule extends
                         props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
                 .shardTransactionCommitQueueCapacity(
                         props.getShardTransactionCommitQueueCapacity().getValue().intValue())
+                .persistent(props.getPersistent().booleanValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("operational",
index ef9da94..995e98f 100644 (file)
@@ -147,6 +147,12 @@ module distributed-datastore-provider {
              type non-zero-uint32-type;
              description "Max queue size that an actor's mailbox can reach";
          }
+
+         leaf persistent {
+            default true;
+            type boolean;
+            description "Enable or disable data persistence";
+         }
     }
 
     // Augments the 'configuration' choice node under modules/module.
index 5a45a99..cec7ce1 100644 (file)
@@ -6,8 +6,6 @@ import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -36,6 +34,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     private final DatastoreContext.Builder datastoreContextBuilder =
@@ -43,7 +44,6 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testWriteTransactionWithSingleShard() throws Exception{
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem()) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
@@ -60,7 +60,6 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testWriteTransactionWithMultipleShards() throws Exception{
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem()) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
index c04dcf1..f6eb6d7 100644 (file)
@@ -2,18 +2,19 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
-import akka.japi.Creator;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
@@ -33,6 +34,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashSet;
@@ -40,7 +42,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -347,6 +351,79 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRecoveryApplicable(){
+        new JavaTestKit(getSystem()) {
+            {
+                final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
+                        new MockClusterWrapper(),
+                        new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(true).build());
+                final TestActorRef<ShardManager> persistentShardManager =
+                        TestActorRef.create(getSystem(), persistentProps);
+
+                DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+                assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
+
+                final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
+                        new MockClusterWrapper(),
+                        new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(false).build());
+                final TestActorRef<ShardManager> nonPersistentShardManager =
+                        TestActorRef.create(getSystem(), nonPersistentProps);
+
+                DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+                assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
+
+
+            }};
+
+    }
+
+    @Test
+    public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
+            throws Exception {
+        final CountDownLatch persistLatch = new CountDownLatch(1);
+        final Creator<ShardManager> creator = new Creator<ShardManager>() {
+            @Override
+            public ShardManager create() throws Exception {
+                return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+                    @Override
+                    protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+                        DataPersistenceProviderMonitor dataPersistenceProviderMonitor
+                                = new DataPersistenceProviderMonitor();
+                        dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+                        return dataPersistenceProviderMonitor;
+                    }
+                };
+            }
+        };
+
+        new JavaTestKit(getSystem()) {{
+
+            final TestActorRef<ShardManager> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
+
+            ModuleIdentifier foo = mock(ModuleIdentifier.class);
+            when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+            Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+            moduleIdentifierSet.add(foo);
+
+            SchemaContext schemaContext = mock(SchemaContext.class);
+            when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+            shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+            assertEquals("Persisted", true,
+                    Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
+
+        }};
+    }
+
+
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
@@ -387,4 +464,17 @@ public class ShardManagerTest extends AbstractActorTest {
         }
 
     }
+
+    private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
+        private Creator<ShardManager> delegate;
+
+        public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public ShardManager create() throws Exception {
+            return delegate.create();
+        }
+    }
 }
index 03a18ea..cd8a658 100644 (file)
@@ -79,6 +79,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -89,15 +90,18 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+
 
 public class ShardTest extends AbstractActorTest {
 
@@ -114,8 +118,6 @@ public class ShardTest extends AbstractActorTest {
 
     @Before
     public void setUp() {
-        System.setProperty("shard.persistent", "false");
-
         InMemorySnapshotStore.clear();
         InMemoryJournal.clear();
     }
@@ -187,7 +189,7 @@ public class ShardTest extends AbstractActorTest {
                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                             dataStoreContext, SCHEMA_CONTEXT) {
                         @Override
-                        public void onReceiveCommand(final Object message) {
+                        public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
                                 // Got the first ElectionTimeout. We don't forward it to the
                                 // base Shard yet until we've sent the RegisterChangeListener
@@ -306,7 +308,7 @@ public class ShardTest extends AbstractActorTest {
     }
 
     @Test
-    public void testPeerAddressResolved(){
+    public void testPeerAddressResolved() throws Exception {
         new ShardTestKit(getSystem()) {{
             final CountDownLatch recoveryComplete = new CountDownLatch(1);
             class TestShard extends Shard {
@@ -352,7 +354,7 @@ public class ShardTest extends AbstractActorTest {
     }
 
     @Test
-    public void testApplySnapshot() throws ExecutionException, InterruptedException {
+    public void testApplySnapshot() throws Exception {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
@@ -571,7 +573,6 @@ public class ShardTest extends AbstractActorTest {
     @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
-        System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -915,7 +916,6 @@ public class ShardTest extends AbstractActorTest {
 
     @Test
     public void testAbortBeforeFinishCommit() throws Throwable {
-        System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -1196,6 +1196,18 @@ public class ShardTest extends AbstractActorTest {
 
     @Test
     public void testCreateSnapshot() throws IOException, InterruptedException {
+            testCreateSnapshot(true, "testCreateSnapshot");
+    }
+
+    @Test
+    public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+        testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
+    }
+
+    public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException {
+        final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
             Creator<Shard> creator = new Creator<Shard>() {
@@ -1204,8 +1216,8 @@ public class ShardTest extends AbstractActorTest {
                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                             dataStoreContext, SCHEMA_CONTEXT) {
                         @Override
-                        public void saveSnapshot(Object snapshot) {
-                            super.saveSnapshot(snapshot);
+                        protected void commitSnapshot(long sequenceNumber) {
+                            super.commitSnapshot(sequenceNumber);
                             latch.get().countDown();
                         }
                     };
@@ -1213,7 +1225,7 @@ public class ShardTest extends AbstractActorTest {
             };
 
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
+                    Props.create(new DelegatingShardCreator(creator)), shardActorName);
 
             waitUntilLeader(shard);
 
@@ -1262,6 +1274,41 @@ public class ShardTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testRecoveryApplicable(){
+
+        final DatastoreContext persistentContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
+
+        final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+                persistentContext, SCHEMA_CONTEXT);
+
+        final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
+
+        final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+                nonPersistentContext, SCHEMA_CONTEXT);
+
+        new ShardTestKit(getSystem()) {{
+            TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
+                    persistentProps, "testPersistence1");
+
+            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
+                    nonPersistentProps, "testPersistence2");
+
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+        }};
+
+    }
+
+
     private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
index d08258a..79d5c51 100644 (file)
@@ -7,21 +7,22 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import com.google.common.util.concurrent.Uninterruptibles;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 class ShardTestKit extends JavaTestKit {
 
@@ -67,4 +68,5 @@ class ShardTestKit extends JavaTestKit {
 
         Assert.fail("Leader not found for shard " + shard.path());
     }
+
 }
\ No newline at end of file
index 9e0bba4..35f346f 100644 (file)
@@ -368,7 +368,6 @@ public class TransactionProxyTest {
             future.checkedGet(5, TimeUnit.SECONDS);
             fail("Expected ReadFailedException");
         } catch(ReadFailedException e) {
-            e.printStackTrace();
             throw e.getCause();
         }
     }