X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=6e1a13cf0c19669443b9273e1d2703a3ff2dede9;hb=34bc6ec632529a0dfe419aa7404bb42a456fbc96;hp=91bbeeca504b607147e39927dae1481ee9c4df6e;hpb=7225f60c394a26143f8421b0f99f2585699fa306;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 91bbeeca50..6e1a13cf0c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -21,6 +21,7 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; import com.google.common.base.Optional; import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -96,7 +97,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ - private RaftActorContext context; + protected RaftActorContext context; /** * The in-memory journal @@ -134,6 +135,7 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setReplicatedLog(replicatedLog); context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); LOG.info("Applied snapshot to replicatedLog. " + "snapshotIndex={}, snapshotTerm={}, journal-size={}", @@ -146,27 +148,36 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof ReplicatedLogEntry) { ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; - - // Apply State immediately + LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex()); replicatedLog.append(logEntry); - applyState(null, "recovery", logEntry.getData()); - context.setLastApplied(logEntry.getIndex()); - context.setCommitIndex(logEntry.getIndex()); + + } else if (message instanceof ApplyLogEntries) { + ApplyLogEntries ale = (ApplyLogEntries) message; + + LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}", + context.getLastApplied() + 1, ale.getToIndex()); + + for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { + applyState(null, "recovery", replicatedLog.get(i).getData()); + } + context.setLastApplied(ale.getToIndex()); + context.setCommitIndex(ale.getToIndex()); + } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { - if(LOG.isDebugEnabled()) { - LOG.debug( - "RecoveryCompleted - Switching actor to Follower - " + - "Persistence Id = " + persistenceId() + - " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + - "journal-size={}", - replicatedLog.lastIndex(), replicatedLog.snapshotIndex, - replicatedLog.snapshotTerm, replicatedLog.size() - ); - } + LOG.info( + "RecoveryCompleted - Switching actor to Follower - " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "journal-size={}", + replicatedLog.lastIndex(), replicatedLog.snapshotIndex, + replicatedLog.snapshotTerm, replicatedLog.size()); currentBehavior = switchBehavior(RaftState.Follower); onStateChanged(); } @@ -185,6 +196,15 @@ public abstract class RaftActor extends UntypedPersistentActor { applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); + } else if (message instanceof ApplyLogEntries){ + ApplyLogEntries ale = (ApplyLogEntries) message; + LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex()); + persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { + @Override + public void apply(ApplyLogEntries param) throws Exception { + } + }); + } else if(message instanceof ApplySnapshot ) { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();