BUG 2134 : Make persistence configurable at the datastore level
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 66a46ef3bde0ca8a8d1fa8fe1056ccb463a594d6..2459c2ff8b1764d3cd3b56be90fc7ea5191d65b6 100644 (file)
@@ -18,10 +18,11 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
-import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -38,6 +39,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+
 import java.io.Serializable;
 import java.util.Map;
 
@@ -81,7 +83,7 @@ import java.util.Map;
  * <li> when a snapshot should be saved </li>
  * </ul>
  */
-public abstract class RaftActor extends UntypedPersistentActor {
+public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
@@ -135,24 +137,40 @@ public abstract class RaftActor extends UntypedPersistentActor {
     public void preStart() throws Exception {
         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
                 context.getConfigParams().getJournalRecoveryLogBatchSize());
+
         super.preStart();
     }
 
     @Override
-    public void onReceiveRecover(Object message) {
-        if (message instanceof SnapshotOffer) {
-            onRecoveredSnapshot((SnapshotOffer)message);
-        } else if (message instanceof ReplicatedLogEntry) {
-            onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
-        } else if (message instanceof ApplyLogEntries) {
-            onRecoveredApplyLogEntries((ApplyLogEntries)message);
-        } else if (message instanceof DeleteEntries) {
-            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
-        } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                    ((UpdateElectionTerm) message).getVotedFor());
-        } else if (message instanceof RecoveryCompleted) {
-            onRecoveryCompletedMessage();
+    public void handleRecover(Object message) {
+        if(persistence().isRecoveryApplicable()) {
+            if (message instanceof SnapshotOffer) {
+                onRecoveredSnapshot((SnapshotOffer) message);
+            } else if (message instanceof ReplicatedLogEntry) {
+                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+            } else if (message instanceof ApplyLogEntries) {
+                onRecoveredApplyLogEntries((ApplyLogEntries) message);
+            } else if (message instanceof DeleteEntries) {
+                replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+            } else if (message instanceof UpdateElectionTerm) {
+                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                        ((UpdateElectionTerm) message).getVotedFor());
+            } else if (message instanceof RecoveryCompleted) {
+                onRecoveryCompletedMessage();
+            }
+        } else {
+            if (message instanceof RecoveryCompleted) {
+                // Delete all the messages from the akka journal so that we do not end up with consistency issues
+                // Note I am not using the dataPersistenceProvider and directly using the akka api here
+                deleteMessages(lastSequenceNr());
+
+                // Delete all the akka snapshots as they will not be needed
+                deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
+
+                onRecoveryComplete();
+                currentBehavior = new Follower(context);
+                onStateChanged();
+            }
         }
     }
 
@@ -254,7 +272,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         onStateChanged();
     }
 
-    @Override public void onReceiveCommand(Object message) {
+    @Override public void handleCommand(Object message) {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -272,7 +290,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
             }
-            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+            persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
                 public void apply(ApplyLogEntries param) throws Exception {
                 }
@@ -304,10 +322,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
             LOG.info("SaveSnapshotSuccess received for snapshot");
 
-            context.getReplicatedLog().snapshotCommit();
+            long sequenceNumber = success.metadata().sequenceNr();
 
-            // TODO: Not sure if we want to be this aggressive with trimming stuff
-            trimPersistentData(success.metadata().sequenceNr());
+            commitSnapshot(sequenceNumber);
 
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
@@ -485,7 +502,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         context.setPeerAddress(peerId, peerAddress);
     }
 
+    protected void commitSnapshot(long sequenceNumber) {
+        context.getReplicatedLog().snapshotCommit();
 
+        // TODO: Not sure if we want to be this aggressive with trimming stuff
+        trimPersistentData(sequenceNumber);
+    }
 
     /**
      * The applyState method will be called by the RaftActor when some data
@@ -515,7 +537,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     /**
      * This method is called during recovery to append state data to the current batch. This method
-     * is called 1 or more times after {@link #startRecoveryStateBatch}.
+     * is called 1 or more times after {@link #startLogRecoveryBatch}.
      *
      * @param data the state data
      */
@@ -530,7 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
-     * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
      */
     protected abstract void applyCurrentLogRecoveryBatch();
 
@@ -566,17 +588,19 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected abstract void onStateChanged();
 
+    protected abstract DataPersistenceProvider persistence();
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private void trimPersistentData(long sequenceNumber) {
         // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
         // For now guessing that it is ANDed.
-        deleteSnapshots(new SnapshotSelectionCriteria(
+        persistence().deleteSnapshots(new SnapshotSelectionCriteria(
             sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
         // Trim akka journal
-        deleteMessages(sequenceNumber);
+        persistence().deleteMessages(sequenceNumber);
     }
 
     private String getLeaderAddress(){
@@ -605,7 +629,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
-        saveSnapshot(sn);
+        persistence().saveSnapshot(sn);
 
         LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
 
@@ -647,7 +671,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // FIXME: Maybe this should be done after the command is saved
             journal.subList(adjustedIndex , journal.size()).clear();
 
-            persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
 
                 @Override public void apply(DeleteEntries param)
                     throws Exception {
@@ -677,7 +701,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // persist call and the execution(s) of the associated event
             // handler. This also holds for multiple persist calls in context
             // of a single command.
-            persist(replicatedLogEntry,
+            persistence().persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
                     @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
@@ -723,7 +747,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     }
 
-    private static class DeleteEntries implements Serializable {
+    static class DeleteEntries implements Serializable {
         private final int fromIndex;
 
 
@@ -766,7 +790,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public void updateAndPersist(long currentTerm, String votedFor){
             update(currentTerm, votedFor);
             // FIXME : Maybe first persist then update the state
-            persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+            persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
 
                 @Override public void apply(UpdateElectionTerm param)
                     throws Exception {
@@ -776,7 +800,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
-    private static class UpdateElectionTerm implements Serializable {
+    static class UpdateElectionTerm implements Serializable {
         private final long currentTerm;
         private final String votedFor;
 
@@ -794,4 +818,29 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
+
+        public NonPersistentRaftDataProvider(){
+
+        }
+
+        /**
+         * The way snapshotting works is,
+         * <ol>
+         * <li> RaftActor calls createSnapshot on the Shard
+         * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+         * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
+         * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
+         * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
+         * in SaveSnapshotSuccess.
+         * </ol>
+         * @param o
+         */
+        @Override
+        public void saveSnapshot(Object o) {
+            // Make saving Snapshot successful
+            commitSnapshot(-1L);
+        }
+    }
+
 }