X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=6e1a13cf0c19669443b9273e1d2703a3ff2dede9;hb=34bc6ec632529a0dfe419aa7404bb42a456fbc96;hp=8270f2949a67cc9fc00f5180dce41872ca6a8a47;hpb=b5b204bafd8ee18692fc023cb2eae6e123369340;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 8270f2949a..6e1a13cf0c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -21,6 +21,7 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; import com.google.common.base.Optional; import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -147,12 +148,20 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof ReplicatedLogEntry) { ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; - - // Apply State immediately + LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex()); replicatedLog.append(logEntry); - applyState(null, "recovery", logEntry.getData()); - context.setLastApplied(logEntry.getIndex()); - context.setCommitIndex(logEntry.getIndex()); + + } else if (message instanceof ApplyLogEntries) { + ApplyLogEntries ale = (ApplyLogEntries) message; + + LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}", + context.getLastApplied() + 1, ale.getToIndex()); + + for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { + applyState(null, "recovery", replicatedLog.get(i).getData()); + } + context.setLastApplied(ale.getToIndex()); + context.setCommitIndex(ale.getToIndex()); } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); @@ -187,6 +196,15 @@ public abstract class RaftActor extends UntypedPersistentActor { applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); + } else if (message instanceof ApplyLogEntries){ + ApplyLogEntries ale = (ApplyLogEntries) message; + LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex()); + persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { + @Override + public void apply(ApplyLogEntries param) throws Exception { + } + }); + } else if(message instanceof ApplySnapshot ) { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();