Merge "bug 1888 - FRM Flow Listener registration fail"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 8270f2949a67cc9fc00f5180dce41872ca6a8a47..6e1a13cf0c19669443b9273e1d2703a3ff2dede9 100644 (file)
@@ -21,6 +21,7 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -147,12 +148,20 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
-            // Apply State immediately
+            LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
             replicatedLog.append(logEntry);
-            applyState(null, "recovery", logEntry.getData());
-            context.setLastApplied(logEntry.getIndex());
-            context.setCommitIndex(logEntry.getIndex());
+
+        } else if (message instanceof ApplyLogEntries) {
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+
+            LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+                context.getLastApplied() + 1, ale.getToIndex());
+
+            for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+                applyState(null, "recovery", replicatedLog.get(i).getData());
+            }
+            context.setLastApplied(ale.getToIndex());
+            context.setCommitIndex(ale.getToIndex());
 
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
@@ -187,6 +196,15 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+        } else if (message instanceof ApplyLogEntries){
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+            LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+                @Override
+                public void apply(ApplyLogEntries param) throws Exception {
+                }
+            });
+
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();