X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorRecoverySupport.java;h=8cf01f11eb7e95b8944e117d4112600f2c00fd9f;hb=214ba02ca4400d88e494fa27a44c30531f68968e;hp=5f33c738e1e44e972b5a7962302f22f177499899;hpb=3b2b1027ba6ab7bb1669466d1b7bdc0249809a08;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 5f33c738e1..8cf01f11eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -10,11 +10,10 @@ package org.opendaylight.controller.cluster.raft; import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotOffer; import com.google.common.base.Stopwatch; -import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries; -import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; +import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.slf4j.Logger; @@ -24,7 +23,6 @@ import org.slf4j.Logger; * @author Thomas Pantelis */ class RaftActorRecoverySupport { - private final DataPersistenceProvider persistence; private final RaftActorContext context; private final RaftActorBehavior currentBehavior; private final RaftActorRecoveryCohort cohort; @@ -34,9 +32,8 @@ class RaftActorRecoverySupport { private Stopwatch recoveryTimer; private final Logger log; - RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context, - RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) { - this.persistence = persistence; + RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior, + RaftActorRecoveryCohort cohort) { this.context = context; this.currentBehavior = currentBehavior; this.cohort = cohort; @@ -45,7 +42,7 @@ class RaftActorRecoverySupport { boolean handleRecoveryMessage(Object message) { boolean recoveryComplete = false; - if(persistence.isRecoveryApplicable()) { + if(context.getPersistenceProvider().isRecoveryApplicable()) { if (message instanceof SnapshotOffer) { onRecoveredSnapshot((SnapshotOffer) message); } else if (message instanceof ReplicatedLogEntry) { @@ -57,6 +54,14 @@ class RaftActorRecoverySupport { onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex()); } else if (message instanceof DeleteEntries) { replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) { + // Handle this message for backwards compatibility with pre-Lithium versions. + replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex()); + } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) { + // Handle this message for backwards compatibility with pre-Lithium versions. + org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update = + (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message; + context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor()); } else if (message instanceof UpdateElectionTerm) { context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); @@ -94,7 +99,7 @@ class RaftActorRecoverySupport { // The replicated log can be used later on to retrieve this snapshot // when we need to install it on a peer - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior)); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior)); context.setLastApplied(snapshot.getLastAppliedIndex()); context.setCommitIndex(snapshot.getLastAppliedIndex()); @@ -104,31 +109,45 @@ class RaftActorRecoverySupport { cohort.applyRecoverySnapshot(snapshot.getState()); timer.stop(); - log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + - replicatedLog().size(), context.getId(), timer.toString(), - replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm()); + log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}", + context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(), + replicatedLog().getSnapshotTerm(), replicatedLog().size()); } private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { if(log.isDebugEnabled()) { - log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex()); + log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(), + logEntry.getIndex(), logEntry.size()); } replicatedLog().append(logEntry); } private void onRecoveredApplyLogEntries(long toIndex) { + long lastUnappliedIndex = context.getLastApplied() + 1; + if(log.isDebugEnabled()) { - log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", - context.getId(), context.getLastApplied() + 1, toIndex); + // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal + // but the entry itself has made it to that state and recovered via the snapshot + log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}", + context.getId(), lastUnappliedIndex, toIndex); } - for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { - batchRecoveredLogEntry(replicatedLog().get(i)); + long lastApplied = lastUnappliedIndex - 1; + for (long i = lastUnappliedIndex; i <= toIndex; i++) { + ReplicatedLogEntry logEntry = replicatedLog().get(i); + if(logEntry != null) { + lastApplied++; + batchRecoveredLogEntry(logEntry); + } else { + // Shouldn't happen but cover it anyway. + log.error("Log entry not found for index {}", i); + break; + } } - context.setLastApplied(toIndex); - context.setCommitIndex(toIndex); + context.setLastApplied(lastApplied); + context.setCommitIndex(lastApplied); } private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {