import akka.persistence.UntypedPersistentActor;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
} else if (message instanceof ReplicatedLogEntry) {
ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
- // Apply State immediately
+ LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
replicatedLog.append(logEntry);
- applyState(null, "recovery", logEntry.getData());
- context.setLastApplied(logEntry.getIndex());
- context.setCommitIndex(logEntry.getIndex());
+
+ } else if (message instanceof ApplyLogEntries) {
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+
+ LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+ context.getLastApplied() + 1, ale.getToIndex());
+
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ applyState(null, "recovery", replicatedLog.get(i).getData());
+ }
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
+ } else if (message instanceof ApplyLogEntries){
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+ LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ @Override
+ public void apply(ApplyLogEntries param) throws Exception {
+ }
+ });
+
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();