Merge "bug 1888 - FRM Flow Listener registration fail"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 91bbeeca504b607147e39927dae1481ee9c4df6e..6e1a13cf0c19669443b9273e1d2703a3ff2dede9 100644 (file)
@@ -21,6 +21,7 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -96,7 +97,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private RaftActorContext context;
+    protected RaftActorContext context;
 
     /**
      * The in-memory journal
@@ -134,6 +135,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
+            context.setCommitIndex(snapshot.getLastAppliedIndex());
 
             LOG.info("Applied snapshot to replicatedLog. " +
                     "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -146,27 +148,36 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
-            // Apply State immediately
+            LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
             replicatedLog.append(logEntry);
-            applyState(null, "recovery", logEntry.getData());
-            context.setLastApplied(logEntry.getIndex());
-            context.setCommitIndex(logEntry.getIndex());
+
+        } else if (message instanceof ApplyLogEntries) {
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+
+            LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+                context.getLastApplied() + 1, ale.getToIndex());
+
+            for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+                applyState(null, "recovery", replicatedLog.get(i).getData());
+            }
+            context.setLastApplied(ale.getToIndex());
+            context.setCommitIndex(ale.getToIndex());
+
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
         } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                ((UpdateElectionTerm) message).getVotedFor());
+
         } else if (message instanceof RecoveryCompleted) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "RecoveryCompleted - Switching actor to Follower - " +
-                        "Persistence Id =  " + persistenceId() +
-                        " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                        "journal-size={}",
-                    replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                    replicatedLog.snapshotTerm, replicatedLog.size()
-                );
-            }
+            LOG.info(
+                "RecoveryCompleted - Switching actor to Follower - " +
+                    "Persistence Id =  " + persistenceId() +
+                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "journal-size={}",
+                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+                replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
             onStateChanged();
         }
@@ -185,6 +196,15 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+        } else if (message instanceof ApplyLogEntries){
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+            LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+                @Override
+                public void apply(ApplyLogEntries param) throws Exception {
+                }
+            });
+
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();