X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorRecoverySupport.java;h=57603a505833af58b99d2916eaa5ed2a09a065c0;hp=5f33c738e1e44e972b5a7962302f22f177499899;hb=e3a22ae5edead2319553bb4dfce59e359386d535;hpb=ebaf3d71465066033d5882c61cdd2ec63b29d980 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 5f33c738e1..57603a5058 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -11,10 +11,10 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotOffer; import com.google.common.base.Stopwatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries; import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.slf4j.Logger; @@ -57,6 +57,9 @@ class RaftActorRecoverySupport { onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex()); } else if (message instanceof DeleteEntries) { replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) { + // Handle this message for backwards compatibility with pre-Lithium versions. + replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex()); } else if (message instanceof UpdateElectionTerm) { context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); @@ -104,31 +107,43 @@ class RaftActorRecoverySupport { cohort.applyRecoverySnapshot(snapshot.getState()); timer.stop(); - log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + - replicatedLog().size(), context.getId(), timer.toString(), - replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm()); + log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}", + context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(), + replicatedLog().getSnapshotTerm(), replicatedLog().size()); } private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { if(log.isDebugEnabled()) { - log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex()); + log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(), + logEntry.getIndex(), logEntry.size()); } replicatedLog().append(logEntry); } private void onRecoveredApplyLogEntries(long toIndex) { + long lastUnappliedIndex = context.getLastApplied() + 1; + if(log.isDebugEnabled()) { - log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", - context.getId(), context.getLastApplied() + 1, toIndex); + log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}", + context.getId(), lastUnappliedIndex, toIndex); } - for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { - batchRecoveredLogEntry(replicatedLog().get(i)); + long lastApplied = lastUnappliedIndex - 1; + for (long i = lastUnappliedIndex; i <= toIndex; i++) { + ReplicatedLogEntry logEntry = replicatedLog().get(i); + if(logEntry != null) { + lastApplied++; + batchRecoveredLogEntry(logEntry); + } else { + // Shouldn't happen but cover it anyway. + log.error("Log entry not found for index {}", i); + break; + } } - context.setLastApplied(toIndex); - context.setCommitIndex(toIndex); + context.setLastApplied(lastApplied); + context.setCommitIndex(lastApplied); } private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {