Always persist and recover election term info
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index 8cf01f11eb7e95b8944e117d4112600f2c00fd9f..85de4dac7586a6377230c0f4793f9a6b0783316c 100644 (file)
@@ -9,7 +9,10 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
@@ -28,6 +31,7 @@ class RaftActorRecoverySupport {
     private final RaftActorRecoveryCohort cohort;
 
     private int currentRecoveryBatchCount;
+    private boolean dataRecoveredWithPersistenceDisabled;
 
     private Stopwatch recoveryTimer;
     private final Logger log;
@@ -40,9 +44,20 @@ class RaftActorRecoverySupport {
         this.log = context.getLogger();
     }
 
-    boolean handleRecoveryMessage(Object message) {
+    boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+        log.trace("handleRecoveryMessage: {}", message);
+
         boolean recoveryComplete = false;
-        if(context.getPersistenceProvider().isRecoveryApplicable()) {
+        DataPersistenceProvider persistence = context.getPersistenceProvider();
+        if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
+            // Handle this message for backwards compatibility with pre-Lithium versions.
+            org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
+                    (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
+            context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                    ((UpdateElectionTerm) message).getVotedFor());
+        } else if(persistence.isRecoveryApplicable()) {
             if (message instanceof SnapshotOffer) {
                 onRecoveredSnapshot((SnapshotOffer) message);
             } else if (message instanceof ReplicatedLogEntry) {
@@ -57,20 +72,30 @@ class RaftActorRecoverySupport {
             } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
                 // Handle this message for backwards compatibility with pre-Lithium versions.
                 replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
-            } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
-                // Handle this message for backwards compatibility with pre-Lithium versions.
-                org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
-                        (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
-                context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
-            } else if (message instanceof UpdateElectionTerm) {
-                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                        ((UpdateElectionTerm) message).getVotedFor());
             } else if (message instanceof RecoveryCompleted) {
                 onRecoveryCompletedMessage();
                 recoveryComplete = true;
             }
         } else if (message instanceof RecoveryCompleted) {
             recoveryComplete = true;
+
+            if(dataRecoveredWithPersistenceDisabled) {
+                // Data persistence is disabled but we recovered some data entries so we must have just
+                // transitioned to disabled or a persistence backup was restored. Either way, delete all the
+                // messages from the akka journal for efficiency and so that we do not end up with consistency
+                // issues in case persistence is -re-enabled.
+                persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
+
+                // Delete all the akka snapshots as they will not be needed
+                persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
+                        scala.Long.MaxValue()));
+
+                // Since we cleaned out the journal, we need to re-write the current election info.
+                context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
+                        context.getTermInformation().getVotedFor());
+            }
+        } else {
+            dataRecoveredWithPersistenceDisabled = true;
         }
 
         return recoveryComplete;