Always persist and recover election term info
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index 5dc8361cc43470b72165dfd2c7b51d6372ad3438..85de4dac7586a6377230c0f4793f9a6b0783316c 100644 (file)
@@ -9,12 +9,14 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.base.Stopwatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
 
@@ -24,28 +26,38 @@ import org.slf4j.Logger;
  * @author Thomas Pantelis
  */
 class RaftActorRecoverySupport {
-    private final DataPersistenceProvider persistence;
     private final RaftActorContext context;
     private final RaftActorBehavior currentBehavior;
     private final RaftActorRecoveryCohort cohort;
 
     private int currentRecoveryBatchCount;
+    private boolean dataRecoveredWithPersistenceDisabled;
 
     private Stopwatch recoveryTimer;
     private final Logger log;
 
-    RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
-            RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
-        this.persistence = persistence;
+    RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
+            RaftActorRecoveryCohort cohort) {
         this.context = context;
         this.currentBehavior = currentBehavior;
         this.cohort = cohort;
         this.log = context.getLogger();
     }
 
-    boolean handleRecoveryMessage(Object message) {
+    boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+        log.trace("handleRecoveryMessage: {}", message);
+
         boolean recoveryComplete = false;
-        if(persistence.isRecoveryApplicable()) {
+        DataPersistenceProvider persistence = context.getPersistenceProvider();
+        if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
+            // Handle this message for backwards compatibility with pre-Lithium versions.
+            org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
+                    (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
+            context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                    ((UpdateElectionTerm) message).getVotedFor());
+        } else if(persistence.isRecoveryApplicable()) {
             if (message instanceof SnapshotOffer) {
                 onRecoveredSnapshot((SnapshotOffer) message);
             } else if (message instanceof ReplicatedLogEntry) {
@@ -60,15 +72,30 @@ class RaftActorRecoverySupport {
             } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
                 // Handle this message for backwards compatibility with pre-Lithium versions.
                 replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
-            } else if (message instanceof UpdateElectionTerm) {
-                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                        ((UpdateElectionTerm) message).getVotedFor());
             } else if (message instanceof RecoveryCompleted) {
                 onRecoveryCompletedMessage();
                 recoveryComplete = true;
             }
         } else if (message instanceof RecoveryCompleted) {
             recoveryComplete = true;
+
+            if(dataRecoveredWithPersistenceDisabled) {
+                // Data persistence is disabled but we recovered some data entries so we must have just
+                // transitioned to disabled or a persistence backup was restored. Either way, delete all the
+                // messages from the akka journal for efficiency and so that we do not end up with consistency
+                // issues in case persistence is -re-enabled.
+                persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
+
+                // Delete all the akka snapshots as they will not be needed
+                persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
+                        scala.Long.MaxValue()));
+
+                // Since we cleaned out the journal, we need to re-write the current election info.
+                context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
+                        context.getTermInformation().getVotedFor());
+            }
+        } else {
+            dataRecoveredWithPersistenceDisabled = true;
         }
 
         return recoveryComplete;
@@ -97,7 +124,7 @@ class RaftActorRecoverySupport {
         // The replicated log can be used later on to retrieve this snapshot
         // when we need to install it on a peer
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
 
@@ -122,17 +149,30 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
+        long lastUnappliedIndex = context.getLastApplied() + 1;
+
         if(log.isDebugEnabled()) {
-            log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getId(), context.getLastApplied() + 1, toIndex);
+            // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
+            // but the entry itself has made it to that state and recovered via the snapshot
+            log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
+                    context.getId(), lastUnappliedIndex, toIndex);
         }
 
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog().get(i));
+        long lastApplied = lastUnappliedIndex - 1;
+        for (long i = lastUnappliedIndex; i <= toIndex; i++) {
+            ReplicatedLogEntry logEntry = replicatedLog().get(i);
+            if(logEntry != null) {
+                lastApplied++;
+                batchRecoveredLogEntry(logEntry);
+            } else {
+                // Shouldn't happen but cover it anyway.
+                log.error("Log entry not found for index {}", i);
+                break;
+            }
         }
 
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
+        context.setLastApplied(lastApplied);
+        context.setCommitIndex(lastApplied);
     }
 
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {