import akka.persistence.UntypedPersistentActor;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
*/
- private RaftActorContext context;
+ protected RaftActorContext context;
/**
* The in-memory journal
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
LOG.info("Applied snapshot to replicatedLog. " +
"snapshotIndex={}, snapshotTerm={}, journal-size={}",
} else if (message instanceof ReplicatedLogEntry) {
ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
- // Apply State immediately
+ LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
replicatedLog.append(logEntry);
- applyState(null, "recovery", logEntry.getData());
- context.setLastApplied(logEntry.getIndex());
- context.setCommitIndex(logEntry.getIndex());
+
+ } else if (message instanceof ApplyLogEntries) {
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+
+ LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+ context.getLastApplied() + 1, ale.getToIndex());
+
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ applyState(null, "recovery", replicatedLog.get(i).getData());
+ }
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
+
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
} else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+
} else if (message instanceof RecoveryCompleted) {
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "RecoveryCompleted - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size()
- );
- }
+ LOG.info(
+ "RecoveryCompleted - Switching actor to Follower - " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size());
currentBehavior = switchBehavior(RaftState.Follower);
onStateChanged();
}
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
+ } else if (message instanceof ApplyLogEntries){
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+ LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ @Override
+ public void apply(ApplyLogEntries param) throws Exception {
+ }
+ });
+
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();