Merge "bug 1888 - FRM Flow Listener registration fail"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 190f1bd409e6c69ad8f8d8df30a6eaee7a04b3a7..6e1a13cf0c19669443b9273e1d2703a3ff2dede9 100644 (file)
@@ -21,6 +21,7 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -96,7 +97,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private RaftActorContext context;
+    protected RaftActorContext context;
 
     /**
      * The in-memory journal
@@ -134,6 +135,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
+            context.setCommitIndex(snapshot.getLastAppliedIndex());
 
             LOG.info("Applied snapshot to replicatedLog. " +
                     "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -146,18 +148,30 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
-            // Apply State immediately
+            LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
             replicatedLog.append(logEntry);
-            applyState(null, "recovery", logEntry.getData());
-            context.setLastApplied(logEntry.getIndex());
-            context.setCommitIndex(logEntry.getIndex());
+
+        } else if (message instanceof ApplyLogEntries) {
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+
+            LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+                context.getLastApplied() + 1, ale.getToIndex());
+
+            for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+                applyState(null, "recovery", replicatedLog.get(i).getData());
+            }
+            context.setLastApplied(ale.getToIndex());
+            context.setCommitIndex(ale.getToIndex());
+
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
         } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                ((UpdateElectionTerm) message).getVotedFor());
+
         } else if (message instanceof RecoveryCompleted) {
-            LOG.debug(
+            LOG.info(
                 "RecoveryCompleted - Switching actor to Follower - " +
                     "Persistence Id =  " + persistenceId() +
                     " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
@@ -173,19 +187,33 @@ public abstract class RaftActor extends UntypedPersistentActor {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
-            LOG.debug("Applying state for log index {} data {}",
-                applyState.getReplicatedLogEntry().getIndex(),
-                applyState.getReplicatedLogEntry().getData());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Applying state for log index {} data {}",
+                    applyState.getReplicatedLogEntry().getIndex(),
+                    applyState.getReplicatedLogEntry().getData());
+            }
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+        } else if (message instanceof ApplyLogEntries){
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+            LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+                @Override
+                public void apply(ApplyLogEntries param) throws Exception {
+                }
+            });
+
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
-            LOG.debug("ApplySnapshot called on Follower Actor " +
-                "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
-                snapshot.getLastAppliedTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("ApplySnapshot called on Follower Actor " +
+                        "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                    snapshot.getLastAppliedTerm()
+                );
+            }
             applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
@@ -253,7 +281,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
-                LOG.debug("onReceiveCommand: message:" + message.getClass());
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("onReceiveCommand: message:" + message.getClass());
+                }
             }
 
             RaftState state =
@@ -294,7 +324,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
 
-        LOG.debug("Persist data {}", replicatedLogEntry);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Persist data {}", replicatedLogEntry);
+        }
 
         replicatedLog
             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
@@ -483,8 +515,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
             return null;
         }
         String peerAddress = context.getPeerAddress(leaderId);
-        LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
-            + peerAddress);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+                + peerAddress);
+        }
 
         return peerAddress;
     }
@@ -584,10 +618,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
                                 lastAppliedTerm = lastAppliedEntry.getTerm();
                             }
 
-                            LOG.debug("Snapshot Capture logSize: {}", journal.size());
-                            LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
-                            LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
-                            LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("Snapshot Capture logSize: {}", journal.size());
+                                LOG.debug("Snapshot Capture lastApplied:{} ",
+                                    context.getLastApplied());
+                                LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+                                LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                            }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
                             getSelf().tell(new CaptureSnapshot(
@@ -639,8 +676,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
 
         @Override public void update(long currentTerm, String votedFor) {
-            LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+            }
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;
         }